From ba746d03fab656a00920698afe44536cbc3fa1b2 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 17:30:57 +0900 Subject: [PATCH] =?UTF-8?q?local=20channel=EB=A1=9C=20=EB=A8=BC=EC=A0=80?= =?UTF-8?q?=20message=EB=A5=BC=20=EB=B3=B4=EB=82=B4=EB=B4=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 97 +++++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index e606797..b62193b 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -67,14 +66,15 @@ const ( type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) type subhandler struct { - authCache *gocommon.AuthCollection - redisMsgChanName string - redisCmdChanName string - redisSync *redis.Client - connInOutChan chan *wsconn - deliveryChan chan any - callReceiver WebSocketMessageReceiver - connWaitGroup sync.WaitGroup + authCache *gocommon.AuthCollection + redisMsgChanName string + redisCmdChanName string + redisSync *redis.Client + connInOutChan chan *wsconn + deliveryChan chan any + localDeliveryChan chan any + callReceiver WebSocketMessageReceiver + connWaitGroup sync.WaitGroup } // WebsocketHandler : @@ -115,12 +115,13 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ - authCache: authglobal.Get(region), - redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), - redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), - redisSync: redisSync, - connInOutChan: make(chan *wsconn), - deliveryChan: make(chan any, 1000), + authCache: authglobal.Get(region), + redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), + redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), + redisSync: redisSync, + connInOutChan: make(chan *wsconn), + deliveryChan: make(chan any, 1000), + localDeliveryChan: make(chan any, 100), } authCaches[region] = sh @@ -169,21 +170,11 @@ func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, st ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() } -var errNoRegion = errors.New("region is not valid") - -func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) error { +func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) { sh := ws.authCaches[region] - if sh == nil { - return errNoRegion + if sh != nil { + sh.localDeliveryChan <- msg } - - bt, err := json.Marshal(msg) - if err != nil { - return err - } - - _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() - return err } func (sh *subhandler) mainLoop(ctx context.Context) { @@ -246,6 +237,53 @@ func (sh *subhandler) mainLoop(ctx context.Context) { // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { select { + case usermsg := <-sh.localDeliveryChan: + // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 + // 없으면 publish한다. + switch usermsg := usermsg.(type) { + case *UpstreamMessage: + target := usermsg.Target + if target[0] == '@' { + conn := entireConns[target[1:]] + if conn != nil { + // 이 경우 아니면 publish 해야 함 + conn.WriteMessage(websocket.TextMessage, usermsg.Body) + break + } + } + if bt, err := json.Marshal(usermsg); err == nil { + sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() + } + + case *CommandMessage: + if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + findRoom(roomName, true).in(conn) + break + } + } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + if room := findRoom(roomName, false); room != nil { + room.out(conn) + break + } + } + } + + // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 + if bt, err := json.Marshal(usermsg); err == nil { + sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result() + } + } + case usermsg := <-sh.deliveryChan: switch usermsg := usermsg.(type) { case *UpstreamMessage: @@ -257,7 +295,6 @@ func (sh *subhandler) mainLoop(ctx context.Context) { room.broadcast(usermsg) } } else if target[0] == '@' { - // TODO : 특정 유저에게만 conn := entireConns[target[1:]] if conn != nil { conn.WriteMessage(websocket.TextMessage, usermsg.Body) @@ -273,7 +310,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { findRoom(roomName, true).in(conn) } - } else if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1]