MessageReceiver signature 숮어

This commit is contained in:
2023-07-11 09:36:21 +09:00
parent a842845685
commit 3bb985d0b6
2 changed files with 43 additions and 29 deletions

View File

@ -62,10 +62,10 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b
return true return true
case conn := <-r.inChan: case conn := <-r.inChan:
(*conns)[conn.alias] = conn (*conns)[conn.sender.Accid.Hex()] = conn
case conn := <-r.outChan: case conn := <-r.outChan:
delete((*conns), conn.alias) delete((*conns), conn.sender.Accid.Hex())
case msg := <-r.messageChan: case msg := <-r.messageChan:
for _, conn := range *conns { for _, conn := range *conns {

View File

@ -24,8 +24,7 @@ var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]")
type wsconn struct { type wsconn struct {
*websocket.Conn *websocket.Conn
alias string sender *Sender
accid primitive.ObjectID
} }
type UpstreamMessage struct { type UpstreamMessage struct {
@ -65,7 +64,13 @@ const (
Disconnected = WebSocketMessageType(101) Disconnected = WebSocketMessageType(101)
) )
type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) type Sender struct {
Region string
Accid primitive.ObjectID
Alias string
}
type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader)
type subhandler struct { type subhandler struct {
authCache *gocommon.AuthCollection authCache *gocommon.AuthCollection
@ -77,6 +82,7 @@ type subhandler struct {
localDeliveryChan chan any localDeliveryChan chan any
callReceiver WebSocketMessageReceiver callReceiver WebSocketMessageReceiver
connWaitGroup sync.WaitGroup connWaitGroup sync.WaitGroup
region string
} }
// WebsocketHandler : // WebsocketHandler :
@ -124,6 +130,7 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock
connInOutChan: make(chan *wsconn), connInOutChan: make(chan *wsconn),
deliveryChan: make(chan any, 1000), deliveryChan: make(chan any, 1000),
localDeliveryChan: make(chan any, 100), localDeliveryChan: make(chan any, 100),
region: region,
} }
authCaches[region] = sh authCaches[region] = sh
@ -144,13 +151,13 @@ func (ws *WebsocketHandler) Start(ctx context.Context) {
for region, sh := range ws.authCaches { for region, sh := range ws.authCaches {
chain := ws.receiverChain[region] chain := ws.receiverChain[region]
if len(chain) == 0 { if len(chain) == 0 {
sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) {} sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {}
} else if len(chain) == 1 { } else if len(chain) == 1 {
sh.callReceiver = chain[0] sh.callReceiver = chain[0]
} else { } else {
sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) { sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
for _, r := range chain { for _, r := range chain {
r(accid, alias, messageType, body) r(sender, messageType, body)
} }
} }
} }
@ -281,7 +288,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
case *UpstreamMessage: case *UpstreamMessage:
target := usermsg.Target target := usermsg.Target
if target[0] == '@' { if target[0] == '@' {
conn := entireConns[target[1:]] accid := target[1:]
conn := entireConns[accid]
if conn != nil { if conn != nil {
// 이 경우 아니면 publish 해야 함 // 이 경우 아니면 publish 해야 함
conn.WriteMessage(websocket.TextMessage, usermsg.Body) conn.WriteMessage(websocket.TextMessage, usermsg.Body)
@ -294,19 +302,19 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
case *CommandMessage: case *CommandMessage:
if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0].(string) accid := usermsg.Args[0].(string)
roomName := usermsg.Args[1].(string) roomName := usermsg.Args[1].(string)
conn := entireConns[alias] conn := entireConns[accid]
if conn != nil { if conn != nil {
findRoom(roomName, true).in(conn) findRoom(roomName, true).in(conn)
break break
} }
} else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0].(string) accid := usermsg.Args[0].(string)
roomName := usermsg.Args[1].(string) roomName := usermsg.Args[1].(string)
conn := entireConns[alias] conn := entireConns[accid]
if conn != nil { if conn != nil {
if room := findRoom(roomName, false); room != nil { if room := findRoom(roomName, false); room != nil {
room.out(conn) room.out(conn)
@ -314,10 +322,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
} }
} }
} else if usermsg.Cmd == CommandType_WriteControl && len(usermsg.Args) == 2 { } else if usermsg.Cmd == CommandType_WriteControl && len(usermsg.Args) == 2 {
alias := usermsg.Args[0].(string) accid := usermsg.Args[0].(string)
conn := entireConns[alias] conn := entireConns[accid]
if conn != nil { if conn != nil {
conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{}) conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{})
break
} }
} }
@ -338,7 +347,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
room.broadcast(usermsg) room.broadcast(usermsg)
} }
} else if target[0] == '@' { } else if target[0] == '@' {
conn := entireConns[target[1:]] accid := target[1:]
conn := entireConns[accid]
if conn != nil { if conn != nil {
conn.WriteMessage(websocket.TextMessage, usermsg.Body) conn.WriteMessage(websocket.TextMessage, usermsg.Body)
} }
@ -346,18 +356,18 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
case *CommandMessage: case *CommandMessage:
if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0].(string) accid := usermsg.Args[0].(string)
roomName := usermsg.Args[1].(string) roomName := usermsg.Args[1].(string)
conn := entireConns[alias] conn := entireConns[accid]
if conn != nil { if conn != nil {
findRoom(roomName, true).in(conn) findRoom(roomName, true).in(conn)
} }
} else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 {
alias := usermsg.Args[0].(string) accid := usermsg.Args[0].(string)
roomName := usermsg.Args[1].(string) roomName := usermsg.Args[1].(string)
conn := entireConns[alias] conn := entireConns[accid]
if conn != nil { if conn != nil {
if room := findRoom(roomName, false); room != nil { if room := findRoom(roomName, false); room != nil {
room.out(conn) room.out(conn)
@ -371,13 +381,14 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
case c := <-sh.connInOutChan: case c := <-sh.connInOutChan:
if c.Conn == nil { if c.Conn == nil {
delete(entireConns, c.alias) delete(entireConns, c.sender.Accid.Hex())
for _, room := range rooms { for _, room := range rooms {
room.out(c) room.out(c)
} }
sh.callReceiver(c.accid, c.alias, Connected, nil) sh.callReceiver(c.sender, Disconnected, nil)
} else { } else {
entireConns[c.alias] = c entireConns[c.sender.Accid.Hex()] = c
sh.callReceiver(c.sender, Connected, nil)
} }
} }
} }
@ -386,8 +397,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) {
newconn := &wsconn{ newconn := &wsconn{
Conn: conn, Conn: conn,
alias: alias, sender: &Sender{
accid: accid, Region: sh.region,
Alias: alias,
Accid: accid,
},
} }
sh.connInOutChan <- newconn sh.connInOutChan <- newconn
@ -402,15 +416,15 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID
} }
if messageType == websocket.CloseMessage { if messageType == websocket.CloseMessage {
sh.callReceiver(accid, c.alias, CloseMessage, r) sh.callReceiver(c.sender, CloseMessage, r)
break break
} }
if messageType == websocket.TextMessage { if messageType == websocket.TextMessage {
// 유저가 직접 보낸 메시지 // 유저가 직접 보낸 메시지
sh.callReceiver(accid, c.alias, TextMessage, r) sh.callReceiver(c.sender, TextMessage, r)
} else if messageType == websocket.BinaryMessage { } else if messageType == websocket.BinaryMessage {
sh.callReceiver(accid, c.alias, BinaryMessage, r) sh.callReceiver(c.sender, BinaryMessage, r)
} }
} }
sh.redisSync.Del(context.Background(), accid.Hex()) sh.redisSync.Del(context.Background(), accid.Hex())