337 lines
8.2 KiB
Go
337 lines
8.2 KiB
Go
|
package ws
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"github.com/gin-gonic/gin"
|
||
|
"github.com/gorilla/websocket"
|
||
|
"go-admin/tools"
|
||
|
"go-admin/tools/app"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// Manager 所有 websocket 信息
|
||
|
type Manager struct {
|
||
|
Group map[string]map[string]*Client
|
||
|
groupCount, clientCount uint
|
||
|
Lock sync.Mutex
|
||
|
Register, UnRegister chan *Client
|
||
|
Message chan *MessageData
|
||
|
GroupMessage chan *GroupMessageData
|
||
|
BroadCastMessage chan *BroadCastMessageData
|
||
|
}
|
||
|
|
||
|
// Client 单个 websocket 信息
|
||
|
type Client struct {
|
||
|
Id, Group string
|
||
|
Context context.Context
|
||
|
CancelFunc context.CancelFunc
|
||
|
Socket *websocket.Conn
|
||
|
Message chan []byte
|
||
|
}
|
||
|
|
||
|
// messageData 单个发送数据信息
|
||
|
type MessageData struct {
|
||
|
Id, Group string
|
||
|
Context context.Context
|
||
|
Message []byte
|
||
|
}
|
||
|
|
||
|
// groupMessageData 组广播数据信息
|
||
|
type GroupMessageData struct {
|
||
|
Group string
|
||
|
Message []byte
|
||
|
}
|
||
|
|
||
|
// 广播发送数据信息
|
||
|
type BroadCastMessageData struct {
|
||
|
Message []byte
|
||
|
}
|
||
|
|
||
|
// 读信息,从 websocket 连接直接读取数据
|
||
|
func (c *Client) Read(cxt context.Context) {
|
||
|
defer func(cxt context.Context) {
|
||
|
WebsocketManager.UnRegister <- c
|
||
|
log.Printf("client [%s] disconnect", c.Id)
|
||
|
if err := c.Socket.Close(); err != nil {
|
||
|
log.Printf("client [%s] disconnect err: %s", c.Id, err)
|
||
|
}
|
||
|
}(cxt)
|
||
|
|
||
|
for {
|
||
|
if cxt.Err() != nil {
|
||
|
break
|
||
|
}
|
||
|
messageType, message, err := c.Socket.ReadMessage()
|
||
|
if err != nil || messageType == websocket.CloseMessage {
|
||
|
break
|
||
|
}
|
||
|
log.Printf("client [%s] receive message: %s", c.Id, string(message))
|
||
|
c.Message <- message
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 写信息,从 channel 变量 Send 中读取数据写入 websocket 连接
|
||
|
func (c *Client) Write(cxt context.Context) {
|
||
|
defer func(cxt context.Context) {
|
||
|
log.Printf("client [%s] disconnect", c.Id)
|
||
|
if err := c.Socket.Close(); err != nil {
|
||
|
log.Printf("client [%s] disconnect err: %s", c.Id, err)
|
||
|
}
|
||
|
}(cxt)
|
||
|
|
||
|
for {
|
||
|
if cxt.Err() != nil {
|
||
|
break
|
||
|
}
|
||
|
select {
|
||
|
case message, ok := <-c.Message:
|
||
|
if !ok {
|
||
|
_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
|
||
|
return
|
||
|
}
|
||
|
log.Printf("client [%s] write message: %s", c.Id, string(message))
|
||
|
err := c.Socket.WriteMessage(websocket.TextMessage, message)
|
||
|
if err != nil {
|
||
|
log.Printf("client [%s] writemessage err: %s", c.Id, err)
|
||
|
}
|
||
|
case _ = <-c.Context.Done():
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 启动 websocket 管理器
|
||
|
func (manager *Manager) Start() {
|
||
|
log.Printf("websocket manage start")
|
||
|
for {
|
||
|
select {
|
||
|
// 注册
|
||
|
case client := <-manager.Register:
|
||
|
log.Printf("client [%s] connect", client.Id)
|
||
|
log.Printf("register client [%s] to group [%s]", client.Id, client.Group)
|
||
|
|
||
|
manager.Lock.Lock()
|
||
|
if manager.Group[client.Group] == nil {
|
||
|
manager.Group[client.Group] = make(map[string]*Client)
|
||
|
manager.groupCount += 1
|
||
|
}
|
||
|
manager.Group[client.Group][client.Id] = client
|
||
|
manager.clientCount += 1
|
||
|
manager.Lock.Unlock()
|
||
|
|
||
|
// 注销
|
||
|
case client := <-manager.UnRegister:
|
||
|
log.Printf("unregister client [%s] from group [%s]", client.Id, client.Group)
|
||
|
manager.Lock.Lock()
|
||
|
if mGroup, ok := manager.Group[client.Group]; ok {
|
||
|
if mClient, ok := mGroup[client.Id]; ok {
|
||
|
close(mClient.Message)
|
||
|
delete(mGroup, client.Id)
|
||
|
manager.clientCount -= 1
|
||
|
if len(mGroup) == 0 {
|
||
|
//log.Printf("delete empty group [%s]", client.Group)
|
||
|
delete(manager.Group, client.Group)
|
||
|
manager.groupCount -= 1
|
||
|
}
|
||
|
mClient.CancelFunc()
|
||
|
}
|
||
|
}
|
||
|
manager.Lock.Unlock()
|
||
|
|
||
|
// 发送广播数据到某个组的 channel 变量 Send 中
|
||
|
//case data := <-manager.boardCast:
|
||
|
// if groupMap, ok := manager.wsGroup[data.GroupId]; ok {
|
||
|
// for _, conn := range groupMap {
|
||
|
// conn.Send <- data.Data
|
||
|
// }
|
||
|
// }
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 处理单个 client 发送数据
|
||
|
func (manager *Manager) SendService() {
|
||
|
for {
|
||
|
select {
|
||
|
case data := <-manager.Message:
|
||
|
if groupMap, ok := manager.Group[data.Group]; ok {
|
||
|
if conn, ok := groupMap[data.Id]; ok {
|
||
|
conn.Message <- data.Message
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 处理 group 广播数据
|
||
|
func (manager *Manager) SendGroupService() {
|
||
|
for {
|
||
|
select {
|
||
|
// 发送广播数据到某个组的 channel 变量 Send 中
|
||
|
case data := <-manager.GroupMessage:
|
||
|
if groupMap, ok := manager.Group[data.Group]; ok {
|
||
|
for _, conn := range groupMap {
|
||
|
conn.Message <- data.Message
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 处理广播数据
|
||
|
func (manager *Manager) SendAllService() {
|
||
|
for {
|
||
|
select {
|
||
|
case data := <-manager.BroadCastMessage:
|
||
|
for _, v := range manager.Group {
|
||
|
for _, conn := range v {
|
||
|
conn.Message <- data.Message
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 向指定的 client 发送数据
|
||
|
func (manager *Manager) Send(cxt context.Context, id string, group string, message []byte) {
|
||
|
data := &MessageData{
|
||
|
Id: id,
|
||
|
Context: cxt,
|
||
|
Group: group,
|
||
|
Message: message,
|
||
|
}
|
||
|
manager.Message <- data
|
||
|
}
|
||
|
|
||
|
// 向指定的 Group 广播
|
||
|
func (manager *Manager) SendGroup(group string, message []byte) {
|
||
|
data := &GroupMessageData{
|
||
|
Group: group,
|
||
|
Message: message,
|
||
|
}
|
||
|
manager.GroupMessage <- data
|
||
|
}
|
||
|
|
||
|
// 广播
|
||
|
func (manager *Manager) SendAll(message []byte) {
|
||
|
data := &BroadCastMessageData{
|
||
|
Message: message,
|
||
|
}
|
||
|
manager.BroadCastMessage <- data
|
||
|
}
|
||
|
|
||
|
// 注册
|
||
|
func (manager *Manager) RegisterClient(client *Client) {
|
||
|
manager.Register <- client
|
||
|
}
|
||
|
|
||
|
// 注销
|
||
|
func (manager *Manager) UnRegisterClient(client *Client) {
|
||
|
manager.UnRegister <- client
|
||
|
}
|
||
|
|
||
|
// 当前组个数
|
||
|
func (manager *Manager) LenGroup() uint {
|
||
|
return manager.groupCount
|
||
|
}
|
||
|
|
||
|
// 当前连接个数
|
||
|
func (manager *Manager) LenClient() uint {
|
||
|
return manager.clientCount
|
||
|
}
|
||
|
|
||
|
// 获取 wsManager 管理器信息
|
||
|
func (manager *Manager) Info() map[string]interface{} {
|
||
|
managerInfo := make(map[string]interface{})
|
||
|
managerInfo["groupLen"] = manager.LenGroup()
|
||
|
managerInfo["clientLen"] = manager.LenClient()
|
||
|
managerInfo["chanRegisterLen"] = len(manager.Register)
|
||
|
managerInfo["chanUnregisterLen"] = len(manager.UnRegister)
|
||
|
managerInfo["chanMessageLen"] = len(manager.Message)
|
||
|
managerInfo["chanGroupMessageLen"] = len(manager.GroupMessage)
|
||
|
managerInfo["chanBroadCastMessageLen"] = len(manager.BroadCastMessage)
|
||
|
return managerInfo
|
||
|
}
|
||
|
|
||
|
// 初始化 wsManager 管理器
|
||
|
var WebsocketManager = Manager{
|
||
|
Group: make(map[string]map[string]*Client),
|
||
|
Register: make(chan *Client, 128),
|
||
|
UnRegister: make(chan *Client, 128),
|
||
|
GroupMessage: make(chan *GroupMessageData, 128),
|
||
|
Message: make(chan *MessageData, 128),
|
||
|
BroadCastMessage: make(chan *BroadCastMessageData, 128),
|
||
|
groupCount: 0,
|
||
|
clientCount: 0,
|
||
|
}
|
||
|
|
||
|
// gin 处理 websocket handler
|
||
|
func (manager *Manager) WsClient(c *gin.Context) {
|
||
|
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
|
||
|
upGrader := websocket.Upgrader{
|
||
|
// cross origin domain
|
||
|
CheckOrigin: func(r *http.Request) bool {
|
||
|
return true
|
||
|
},
|
||
|
// 处理 Sec-WebSocket-Protocol Header
|
||
|
Subprotocols: []string{c.GetHeader("Sec-WebSocket-Protocol")},
|
||
|
}
|
||
|
|
||
|
conn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
|
||
|
if err != nil {
|
||
|
log.Printf("websocket connect error: %s", c.Param("channel"))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
fmt.Println("token: ", c.Query("token"))
|
||
|
|
||
|
client := &Client{
|
||
|
Id: c.Param("id"),
|
||
|
Group: c.Param("channel"),
|
||
|
Context: ctx,
|
||
|
CancelFunc: cancel,
|
||
|
Socket: conn,
|
||
|
Message: make(chan []byte, 1024),
|
||
|
}
|
||
|
|
||
|
manager.RegisterClient(client)
|
||
|
go client.Read(ctx)
|
||
|
go client.Write(ctx)
|
||
|
time.Sleep(time.Second * 15)
|
||
|
|
||
|
tools.FileMonitoringById(ctx, "temp/logs/job/db-20200820.log", c.Param("id"), c.Param("channel"), SendOne)
|
||
|
}
|
||
|
|
||
|
func (manager *Manager) UnWsClient(c *gin.Context) {
|
||
|
id := c.Param("id")
|
||
|
group := c.Param("channel")
|
||
|
WsLogout(id, group)
|
||
|
app.OK(c, "ws close success", "success")
|
||
|
}
|
||
|
|
||
|
func SendGroup(msg []byte) {
|
||
|
WebsocketManager.SendGroup("leffss", []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
|
||
|
fmt.Println(WebsocketManager.Info())
|
||
|
}
|
||
|
|
||
|
func SendAll(msg []byte) {
|
||
|
WebsocketManager.SendAll([]byte("{\"code\":200,\"data\":" + string(msg) + "}"))
|
||
|
fmt.Println(WebsocketManager.Info())
|
||
|
}
|
||
|
|
||
|
func SendOne(ctx context.Context, id string, group string, msg []byte) {
|
||
|
WebsocketManager.Send(ctx, id, group, []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
|
||
|
fmt.Println(WebsocketManager.Info())
|
||
|
}
|
||
|
|
||
|
func WsLogout(id string, group string) {
|
||
|
WebsocketManager.UnRegisterClient(&Client{Id: id, Group: group})
|
||
|
fmt.Println(WebsocketManager.Info())
|
||
|
}
|