diff --git a/wshandler/room.go b/wshandler/room.go index 35fb313..db71571 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -13,14 +13,17 @@ type room struct { outChan chan *wsconn messageChan chan *UpstreamMessage name string + destroyChan chan<- string } -func makeRoom(name string) *room { +// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room +func makeRoom(name string, destroyChan chan<- string) *room { return &room{ inChan: make(chan *wsconn, 10), outChan: make(chan *wsconn, 10), messageChan: make(chan *UpstreamMessage, 100), name: name, + destroyChan: destroyChan, } } @@ -28,12 +31,14 @@ func (r *room) broadcast(msg *UpstreamMessage) { r.messageChan <- msg } -func (r *room) in(conn *wsconn) { +func (r *room) in(conn *wsconn) *room { r.inChan <- conn + return r } -func (r *room) out(conn *wsconn) { +func (r *room) out(conn *wsconn) *room { r.outChan <- conn + return r } func (r *room) start(ctx context.Context) { @@ -66,6 +71,10 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b case conn := <-r.outChan: delete((*conns), conn.sender.Accid.Hex()) + if len(*conns) == 0 && r.destroyChan != nil { + r.destroyChan <- r.name + return true + } case msg := <-r.messageChan: ds := DownstreamMessage{ diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index e4427c0..086ad06 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -9,7 +9,6 @@ import ( "net/http" "strings" "sync" - "time" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -24,7 +23,22 @@ var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]") type wsconn struct { *websocket.Conn - sender *Sender + sender *Sender + joinedRooms []*room +} + +func (conn *wsconn) popRoom(r *room) int { + for i, jr := range conn.joinedRooms { + if jr == r { + conn.joinedRooms = append(conn.joinedRooms[:i], conn.joinedRooms[i+1:]...) + break + } + } + return len(conn.joinedRooms) +} + +func (conn *wsconn) pushRoom(r *room) { + conn.joinedRooms = append(conn.joinedRooms, r) } type UpstreamMessage struct { @@ -44,9 +58,8 @@ type DownstreamMessage struct { type commandType string const ( - commandType_JoinRoom = commandType("join_room") - commandType_LeaveRoom = commandType("leave_room") - commandType_WriteControl = commandType("write_control") + commandType_JoinRoom = commandType("join_room") + commandType_LeaveRoom = commandType("leave_room") ) type commandMessage struct { @@ -209,20 +222,6 @@ func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMess } } -func (ws *WebsocketHandler) SendCloseMessage(region string, target string, text string) { - sh := ws.authCaches[region] - if sh != nil { - sh.localDeliveryChan <- &commandMessage{ - Cmd: commandType_WriteControl, - Args: []any{ - target, - int(websocket.CloseMessage), - websocket.FormatCloseMessage(websocket.CloseNormalClosure, text), - }, - } - } -} - func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) { sh := ws.authCaches[region] if sh != nil { @@ -290,10 +289,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { entireConns := make(map[string]*wsconn) rooms := make(map[string]*room) + roomDestroyChan := make(chan string, 1000) findRoom := func(name string, create bool) *room { room := rooms[name] if room == nil && create { - room = makeRoom(name) + room = makeRoom(name, roomDestroyChan) rooms[name] = room room.start(ctx) } @@ -303,6 +303,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) { // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { select { + case destroyedRoom := <-roomDestroyChan: + delete(rooms, destroyedRoom) + case usermsg := <-sh.localDeliveryChan: // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 // 없으면 publish한다. @@ -334,7 +337,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { accid := usermsg.Args[1].(primitive.ObjectID) conn := entireConns[accid.Hex()] if conn != nil { - findRoom(roomName, true).in(conn) + conn.pushRoom(findRoom(roomName, true).in(conn)) break } } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { @@ -343,17 +346,12 @@ func (sh *subhandler) mainLoop(ctx context.Context) { conn := entireConns[accid.Hex()] if conn != nil { if room := findRoom(roomName, false); room != nil { - room.out(conn) + if conn.popRoom(room.out(conn)) == 0 { + conn.Close() + } break } } - } else if usermsg.Cmd == commandType_WriteControl && len(usermsg.Args) == 2 { - accid := usermsg.Args[0].(string) - conn := entireConns[accid] - if conn != nil { - conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{}) - break - } } // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 @@ -391,7 +389,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { accid := usermsg.Args[1].(primitive.ObjectID) conn := entireConns[accid.Hex()] if conn != nil { - findRoom(roomName, true).in(conn) + conn.pushRoom(findRoom(roomName, true).in(conn)) } } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { roomName := usermsg.Args[0].(string) @@ -399,7 +397,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) { conn := entireConns[accid.Hex()] if conn != nil { if room := findRoom(roomName, false); room != nil { - room.out(conn) + if conn.popRoom(room.out(conn)) == 0 { + conn.Close() + } } } } @@ -411,9 +411,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case c := <-sh.connInOutChan: if c.Conn == nil { delete(entireConns, c.sender.Accid.Hex()) - for _, room := range rooms { + for _, room := range c.joinedRooms { room.out(c) } + c.joinedRooms = nil sh.callReceiver(c.sender, Disconnected, nil) } else { entireConns[c.sender.Accid.Hex()] = c