local channel로 먼저 message를 보내봄

This commit is contained in:
2023-07-06 17:30:57 +09:00
parent 20803c67ed
commit ba746d03fa

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -67,14 +66,15 @@ const (
type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader)
type subhandler struct { type subhandler struct {
authCache *gocommon.AuthCollection authCache *gocommon.AuthCollection
redisMsgChanName string redisMsgChanName string
redisCmdChanName string redisCmdChanName string
redisSync *redis.Client redisSync *redis.Client
connInOutChan chan *wsconn connInOutChan chan *wsconn
deliveryChan chan any deliveryChan chan any
callReceiver WebSocketMessageReceiver localDeliveryChan chan any
connWaitGroup sync.WaitGroup callReceiver WebSocketMessageReceiver
connWaitGroup sync.WaitGroup
} }
// WebsocketHandler : // WebsocketHandler :
@ -115,12 +115,13 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock
authCaches := make(map[string]*subhandler) authCaches := make(map[string]*subhandler)
for _, region := range authglobal.Regions() { for _, region := range authglobal.Regions() {
sh := &subhandler{ sh := &subhandler{
authCache: authglobal.Get(region), authCache: authglobal.Get(region),
redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region),
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region),
redisSync: redisSync, redisSync: redisSync,
connInOutChan: make(chan *wsconn), connInOutChan: make(chan *wsconn),
deliveryChan: make(chan any, 1000), deliveryChan: make(chan any, 1000),
localDeliveryChan: make(chan any, 100),
} }
authCaches[region] = sh authCaches[region] = sh
@ -169,21 +170,11 @@ func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, st
ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result()
} }
var errNoRegion = errors.New("region is not valid") func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) {
func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) error {
sh := ws.authCaches[region] sh := ws.authCaches[region]
if sh == nil { if sh != nil {
return errNoRegion sh.localDeliveryChan <- msg
} }
bt, err := json.Marshal(msg)
if err != nil {
return err
}
_, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result()
return err
} }
func (sh *subhandler) mainLoop(ctx context.Context) { func (sh *subhandler) mainLoop(ctx context.Context) {
@ -246,6 +237,53 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
// 유저에게서 온 메세지, 소켓 연결/해체 처리 // 유저에게서 온 메세지, 소켓 연결/해체 처리
for { for {
select { select {
case usermsg := <-sh.localDeliveryChan:
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
// 없으면 publish한다.
switch usermsg := usermsg.(type) {
case *UpstreamMessage:
target := usermsg.Target
if target[0] == '@' {
conn := entireConns[target[1:]]
if conn != nil {
// 이 경우 아니면 publish 해야 함
conn.WriteMessage(websocket.TextMessage, usermsg.Body)
break
}
}
if bt, err := json.Marshal(usermsg); err == nil {
sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result()
}
case *CommandMessage:
if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0]
roomName := usermsg.Args[1]
conn := entireConns[alias]
if conn != nil {
findRoom(roomName, true).in(conn)
break
}
} else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0]
roomName := usermsg.Args[1]
conn := entireConns[alias]
if conn != nil {
if room := findRoom(roomName, false); room != nil {
room.out(conn)
break
}
}
}
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
if bt, err := json.Marshal(usermsg); err == nil {
sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result()
}
}
case usermsg := <-sh.deliveryChan: case usermsg := <-sh.deliveryChan:
switch usermsg := usermsg.(type) { switch usermsg := usermsg.(type) {
case *UpstreamMessage: case *UpstreamMessage:
@ -257,7 +295,6 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
room.broadcast(usermsg) room.broadcast(usermsg)
} }
} else if target[0] == '@' { } else if target[0] == '@' {
// TODO : 특정 유저에게만
conn := entireConns[target[1:]] conn := entireConns[target[1:]]
if conn != nil { if conn != nil {
conn.WriteMessage(websocket.TextMessage, usermsg.Body) conn.WriteMessage(websocket.TextMessage, usermsg.Body)
@ -273,7 +310,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
if conn != nil { if conn != nil {
findRoom(roomName, true).in(conn) findRoom(roomName, true).in(conn)
} }
} else if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0] alias := usermsg.Args[0]
roomName := usermsg.Args[1] roomName := usermsg.Args[1]