From 822681bf74cf877562ab40eeb641f68745b0b28d Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 5 Jul 2023 22:26:57 +0900 Subject: [PATCH] =?UTF-8?q?close=20func=EB=A5=BC=20=EB=B0=96=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20=EC=98=AE=EA=B9=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 16 +++--- wshandler/wshandler.go | 122 +++++++++++++++-------------------------- 2 files changed, 53 insertions(+), 85 deletions(-) diff --git a/wshandler/room.go b/wshandler/room.go index a443e32..f74fb45 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -8,16 +8,16 @@ import ( ) type room struct { - inChan chan *Richconn - outChan chan *Richconn + inChan chan *wsconn + outChan chan *wsconn messageChan chan *UpstreamMessage name string } func makeRoom(name string) *room { return &room{ - inChan: make(chan *Richconn, 10), - outChan: make(chan *Richconn, 10), + inChan: make(chan *wsconn, 10), + outChan: make(chan *wsconn, 10), messageChan: make(chan *UpstreamMessage, 100), name: name, } @@ -27,17 +27,17 @@ func (r *room) broadcast(msg *UpstreamMessage) { r.messageChan <- msg } -func (r *room) in(conn *Richconn) { +func (r *room) in(conn *wsconn) { r.inChan <- conn } -func (r *room) out(conn *Richconn) { +func (r *room) out(conn *wsconn) { r.outChan <- conn } func (r *room) start(ctx context.Context) { go func(ctx context.Context) { - conns := make(map[string]*Richconn) + conns := make(map[string]*wsconn) normal := false for !normal { normal = r.loop(ctx, &conns) @@ -45,7 +45,7 @@ func (r *room) start(ctx context.Context) { }(ctx) } -func (r *room) loop(ctx context.Context, conns *map[string]*Richconn) (normalEnd bool) { +func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd bool) { defer func() { s := recover() if s != nil { diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 977ac58..6836b77 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -21,11 +21,10 @@ import ( var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]") -type Richconn struct { +type wsconn struct { *websocket.Conn - closeFuncLock sync.Mutex - alias string - onClose map[string]func() + alias string + accid primitive.ObjectID } type UpstreamMessage struct { @@ -43,8 +42,8 @@ type DownstreamMessage struct { type CommandType string const ( - CommandType_JoinChannel = CommandType("join_channel") - CommandType_LeaveChannel = CommandType("leave_channel") + CommandType_JoinRoom = CommandType("join_room") + CommandType_LeaveRoom = CommandType("leave_room") ) type CommandMessage struct { @@ -52,53 +51,6 @@ type CommandMessage struct { Args []string } -func (rc *Richconn) RegistOnCloseFunc(name string, f func()) { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - f() - return - } - rc.onClose[name] = f -} - -func (rc *Richconn) HasOnCloseFunc(name string) bool { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - return false - } - - _, ok := rc.onClose[name] - return ok -} - -func (rc *Richconn) UnregistOnCloseFunc(name string) (out func()) { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - return - } - out = rc.onClose[name] - delete(rc.onClose, name) - return -} - -func (rc *Richconn) Closed() { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - for _, f := range rc.onClose { - f() - } -} - -func (rc *Richconn) WriteBytes(data []byte) error { - return rc.WriteMessage(websocket.TextMessage, data) -} - type subhandler struct { authCache *gocommon.AuthCollection redisMsgChanName string @@ -106,8 +58,10 @@ type subhandler struct { redisSync *redis.Client connsLock sync.Mutex connectedAlias map[string]bool - connInOutChan chan *Richconn + connInOutChan chan *wsconn deliveryChan chan any + + callReceiver func(primitive.ObjectID, string, io.Reader) } // WebsocketHandler : @@ -120,7 +74,7 @@ type wsConfig struct { SyncPipeline string `json:"ws_sync_pipeline"` } -func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { +func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiver func(primitive.ObjectID, string, *T)) (wsh *WebsocketHandler) { var config wsConfig gocommon.LoadConfig(&config) @@ -129,6 +83,21 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock panic(err) } + decoder := func(r io.Reader) *T { + if r == nil { + // 접속이 끊겼을 때. + return nil + } + var m T + dec := json.NewDecoder(r) + if err := dec.Decode(&m); err != nil { + logger.Println(err) + } + + // decoding 실패하더라도 빈 *T를 내보냄 + return &m + } + authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ @@ -137,8 +106,11 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisSync: redisSync, connectedAlias: make(map[string]bool), - connInOutChan: make(chan *Richconn), + connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), + callReceiver: func(accid primitive.ObjectID, alias string, r io.Reader) { + receiver(accid, alias, decoder(r)) + }, } authCaches[region] = sh @@ -205,6 +177,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } }() + // redis channel에서 유저가 보낸 메시지를 읽는 go rountine go func() { var pubsub *redis.PubSub for { @@ -241,7 +214,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } }() - entireConns := make(map[string]*Richconn) + entireConns := make(map[string]*wsconn) rooms := make(map[string]*room) findRoom := func(name string, create bool) *room { room := rooms[name] @@ -253,6 +226,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { return room } + // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { select { case usermsg := <-sh.deliveryChan: @@ -270,7 +244,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } case *CommandMessage: - if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1] @@ -278,7 +252,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { findRoom(roomName, true).in(conn) } - } else if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1] @@ -301,7 +275,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { for _, room := range rooms { room.out(c) } - c.Closed() + sh.callReceiver(c.accid, c.alias, nil) } else { sh.setConnected(c.alias, true) entireConns[c.alias] = c @@ -311,18 +285,16 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { - newconn := sh.makeRichConn(alias, conn) + newconn := &wsconn{ + Conn: conn, + alias: alias, + accid: accid, + } sh.connInOutChan <- newconn - go func(c *Richconn, accid primitive.ObjectID, deliveryChan chan<- any) { + go func(c *wsconn, accid primitive.ObjectID, deliveryChan chan<- any) { for { messageType, r, err := c.NextReader() - - // 웹소켓에서 직접 메시지를 받지 않는다. - if r != nil { - io.Copy(io.Discard, r) - } - if err != nil { c.Close() break @@ -331,6 +303,11 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID if messageType == websocket.CloseMessage { break } + + if messageType == websocket.TextMessage { + // 유저가 직접 보낸 메시지 + sh.callReceiver(accid, c.alias, r) + } } c.Conn = nil @@ -432,12 +409,3 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { upgrade_core(sh, conn, accid, alias) } - -func (sh *subhandler) makeRichConn(alias string, conn *websocket.Conn) *Richconn { - rc := Richconn{ - Conn: conn, - alias: alias, - onClose: make(map[string]func()), - } - return &rc -}