diff --git a/wshandler/room.go b/wshandler/room.go index db71571..786793d 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -14,16 +14,18 @@ type room struct { messageChan chan *UpstreamMessage name string destroyChan chan<- string + sendMsgChan chan<- send_msg_queue_elem } // 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room -func makeRoom(name string, destroyChan chan<- string) *room { +func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room { return &room{ inChan: make(chan *wsconn, 10), outChan: make(chan *wsconn, 10), messageChan: make(chan *UpstreamMessage, 100), name: name, destroyChan: destroyChan, + sendMsgChan: sendMsgChan, } } @@ -85,7 +87,11 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b bt, _ := json.Marshal(ds) for _, conn := range *conns { - conn.Conn.WriteMessage(websocket.TextMessage, bt) + r.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: bt, + } } } } diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 5ff3be0..4ffc7db 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -93,6 +93,12 @@ type EventReceiver interface { OnRoomDestroyed(region, name string) } +type send_msg_queue_elem struct { + to *wsconn + mt int + msg []byte +} + type subhandler struct { redisMsgChanName string redisCmdChanName string @@ -100,6 +106,7 @@ type subhandler struct { connInOutChan chan *wsconn deliveryChan chan any localDeliveryChan chan any + sendMsgChan chan send_msg_queue_elem callReceiver EventReceiver connWaitGroup sync.WaitGroup region string @@ -151,6 +158,23 @@ func NewWebsocketHandler() (*WebsocketHandler, error) { return nil, err } + sendchan := make(chan send_msg_queue_elem, 1000) + go func() { + sender := func(elem *send_msg_queue_elem) { + defer func() { + r := recover() + if r != nil { + logger.Println(r) + } + }() + elem.to.WriteMessage(elem.mt, elem.msg) + } + + for elem := range sendchan { + sender(&elem) + } + }() + sh := &subhandler{ redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), @@ -158,6 +182,7 @@ func NewWebsocketHandler() (*WebsocketHandler, error) { connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), localDeliveryChan: make(chan any, 100), + sendMsgChan: sendchan, region: region, } @@ -336,7 +361,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { findRoom := func(name string, create bool) *room { room := rooms[name] if room == nil && create { - room = makeRoom(name, roomDestroyChan) + room = makeRoom(name, roomDestroyChan, sh.sendMsgChan) rooms[name] = room room.start(ctx) go sh.callReceiver.OnRoomCreated(sh.region, name) @@ -370,8 +395,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { Body: usermsg.Body, Tag: usermsg.Tag, }) - - conn.WriteMessage(websocket.TextMessage, ds) + sh.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: ds, + } break } } @@ -439,7 +467,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { Body: usermsg.Body, Tag: usermsg.Tag, }) - conn.WriteMessage(websocket.TextMessage, ds) + sh.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: ds, + } } }