diff --git a/go.mod b/go.mod index a3b172b..40f1afa 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module repositories.action2quare.com/ayo/gocommon go 1.20 -replace repositories.action2quare.com/ayo/gocommon => ./ - require ( github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt v3.2.2+incompatible diff --git a/redis.go b/redis.go index 92a7fc0..bc7e86f 100644 --- a/redis.go +++ b/redis.go @@ -195,6 +195,17 @@ func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) { return respToArray[string](rh.JSONResp(key, path)) } +func (rh *RedisonHandler) JSONGetDocuments(key, path string) ([]map[string]any, error) { + resp, err := rh.JSONGet(key, path) + if err != nil && err != redis.Nil { + return nil, err + } + + var objs []map[string]any + err = json.Unmarshal([]byte(resp.(string)), &objs) + return objs, err +} + func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) { return respToArray[int64](rh.JSONResp(key, path)) } diff --git a/server.go b/server.go index 82251fc..eeb9b60 100644 --- a/server.go +++ b/server.go @@ -618,11 +618,11 @@ func MakeHttpRequestForLogging(r *http.Request) *http.Request { } type apiFuncType func(http.ResponseWriter, *http.Request) -type HttpApiReceiver struct { +type HttpApiHandler struct { methods map[string]apiFuncType } -func MakeHttpApiReceiver[T any](receiver *T, receiverName string) HttpApiReceiver { +func MakeHttpApiHandler[T any](receiver *T, receiverName string) HttpApiHandler { methods := make(map[string]apiFuncType) tp := reflect.TypeOf(receiver) @@ -662,7 +662,7 @@ func MakeHttpApiReceiver[T any](receiver *T, receiverName string) HttpApiReceive } } - return HttpApiReceiver{ + return HttpApiHandler{ methods: methods, } } @@ -671,7 +671,7 @@ type HttpApiHandlerContainer struct { methods map[string]apiFuncType } -func (hc *HttpApiHandlerContainer) RegistReceiver(receiver HttpApiReceiver) { +func (hc *HttpApiHandlerContainer) RegisterApiHandler(receiver HttpApiHandler) { if hc.methods == nil { hc.methods = make(map[string]apiFuncType) } diff --git a/session/common.go b/session/common.go index e352115..8693428 100644 --- a/session/common.go +++ b/session/common.go @@ -10,12 +10,6 @@ import ( "time" "go.mongodb.org/mongo-driver/bson/primitive" - "repositories.action2quare.com/ayo/gocommon" -) - -const ( - communication_channel_name_prefix = "_sess_comm_chan_name" - session_collection_name = gocommon.CollectionName("session") ) type Authorization struct { diff --git a/session/consumer_common.go b/session/consumer_common.go index 0169d6c..3fc261a 100644 --- a/session/consumer_common.go +++ b/session/consumer_common.go @@ -33,13 +33,6 @@ func (c *consumer_common[T]) add_internal(sk storagekey, si T) { delete(c.stages[1].deleted, sk) } -func (c *consumer_common[T]) add(sk storagekey, si T) { - c.lock.Lock() - defer c.lock.Unlock() - - c.add_internal(sk, si) -} - func (c *consumer_common[T]) delete_internal(sk storagekey) { delete(c.stages[0].cache, sk) c.stages[0].deleted[sk] = true diff --git a/session/impl_mongo.go b/session/impl_mongo.go index 096681b..f9713c0 100644 --- a/session/impl_mongo.go +++ b/session/impl_mongo.go @@ -12,6 +12,10 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" ) +const ( + session_collection_name = gocommon.CollectionName("session") +) + type provider_mongo struct { mongoClient gocommon.MongoClient } diff --git a/session/impl_redis.go b/session/impl_redis.go index 770e1bd..3604167 100644 --- a/session/impl_redis.go +++ b/session/impl_redis.go @@ -2,6 +2,7 @@ package session import ( "context" + "fmt" "time" "github.com/go-redis/redis/v8" @@ -11,6 +12,10 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" ) +const ( + communication_channel_name_prefix = "_sess_comm_chan_name" +) + type sessionRedis struct { *Authorization expireAt time.Time @@ -18,7 +23,6 @@ type sessionRedis struct { type provider_redis struct { redisClient *redis.Client - updateChannel string deleteChannel string ttl time.Duration ctx context.Context @@ -32,8 +36,7 @@ func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio return &provider_redis{ redisClient: redisClient, - updateChannel: communication_channel_name_prefix + "_u", - deleteChannel: communication_channel_name_prefix + "_d", + deleteChannel: fmt.Sprintf("%s_%d", communication_channel_name_prefix, redisClient.Options().DB), ttl: ttl, ctx: ctx, }, nil @@ -51,7 +54,6 @@ func (p *provider_redis) New(input *Authorization) (string, error) { return "", err } - _, err = p.redisClient.Publish(p.ctx, p.updateChannel, string(sk)).Result() return string(storagekey_to_publickey(sk)), err } @@ -122,9 +124,8 @@ func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio redisClient: redisClient, } - updateChannel := communication_channel_name_prefix + "_u" deleteChannel := communication_channel_name_prefix + "_d" - sub := redisClient.Subscribe(ctx, updateChannel, deleteChannel) + sub := redisClient.Subscribe(ctx, deleteChannel) go func() { stageswitch := time.Now().Add(ttl) @@ -151,20 +152,6 @@ func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio } switch msg.Channel { - case updateChannel: - sk := storagekey(msg.Payload) - raw, err := redisClient.Get(ctx, string(sk)).Result() - if err != nil { - logger.Println(err) - } else if len(raw) > 0 { - var si Authorization - if bson.Unmarshal([]byte(raw), &si) == nil { - consumer.add(sk, &sessionRedis{ - Authorization: &si, - expireAt: time.Now().Add(consumer.ttl), - }) - } - } case deleteChannel: sk := storagekey(msg.Payload) consumer.delete(sk) diff --git a/wshandler/api_handler.go b/wshandler/api_handler.go new file mode 100644 index 0000000..f977f4b --- /dev/null +++ b/wshandler/api_handler.go @@ -0,0 +1,139 @@ +package wshandler + +import ( + "encoding/json" + "io" + "reflect" + "unsafe" + + "repositories.action2quare.com/ayo/gocommon/logger" +) + +const ( + ClientConnected = "ClientConnected" + ClientDisconnected = "ClientDisconnected" +) + +type apiFuncType func(ApiCallContext) + +type WebsocketApiHandler struct { + methods map[string]apiFuncType + connfunc apiFuncType + disconnfunc apiFuncType +} + +type ApiCallContext struct { + CallBy *Sender + Arguments []any +} + +func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketApiHandler { + methods := make(map[string]apiFuncType) + + tp := reflect.TypeOf(receiver) + if len(receiverName) == 0 { + receiverName = tp.Elem().Name() + } + + var connfunc apiFuncType + var disconnfunc apiFuncType + + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + if method.Type.NumIn() != 2 { + continue + } + + if method.Type.In(0) != tp { + continue + } + + if method.Type.In(1) != reflect.TypeOf((*ApiCallContext)(nil)).Elem() { + continue + } + + funcptr := method.Func.Pointer() + p1 := unsafe.Pointer(&funcptr) + p2 := unsafe.Pointer(&p1) + testfunc := (*func(*T, ApiCallContext))(p2) + + if method.Name == ClientConnected { + connfunc = func(ctx ApiCallContext) { + (*testfunc)(receiver, ctx) + } + } else if method.Name == ClientDisconnected { + disconnfunc = func(ctx ApiCallContext) { + (*testfunc)(receiver, ctx) + } + } else { + methods[receiverName+"."+method.Name] = func(ctx ApiCallContext) { + (*testfunc)(receiver, ctx) + } + } + } + + return WebsocketApiHandler{ + methods: methods, + connfunc: connfunc, + disconnfunc: disconnfunc, + } +} + +type WebsocketApiBroker struct { + methods map[string]apiFuncType + connFuncs []apiFuncType + disconnFuncs []apiFuncType +} + +func (hc *WebsocketApiBroker) AddHandler(receiver WebsocketApiHandler) { + if hc.methods == nil { + hc.methods = make(map[string]apiFuncType) + } + + for k, v := range receiver.methods { + logger.Println("http api registered :", k) + hc.methods[k] = v + } + + if receiver.connfunc != nil { + hc.connFuncs = append(hc.connFuncs, receiver.connfunc) + } + + if receiver.disconnfunc != nil { + // disconnfunc은 역순 + hc.disconnFuncs = append([]apiFuncType{receiver.disconnfunc}, hc.disconnFuncs...) + } +} + +func (hc *WebsocketApiBroker) Call(callby *Sender, funcname string, r io.Reader) { + if funcname == ClientConnected { + for _, v := range hc.connFuncs { + v(ApiCallContext{ + CallBy: callby, + Arguments: nil, + }) + } + } else if funcname == ClientDisconnected { + for _, v := range hc.disconnFuncs { + v(ApiCallContext{ + CallBy: callby, + Arguments: nil, + }) + } + } else if found := hc.methods[funcname]; found != nil { + var args []any + if r != nil { + dec := json.NewDecoder(r) + if err := dec.Decode(&args); err != nil { + logger.Println("WebsocketApiBroker.Call failed. decode returns err :", err) + } + } + + found(ApiCallContext{ + CallBy: callby, + Arguments: args, + }) + } else { + logger.Println("api is not found :", funcname) + } +} diff --git a/wshandler/api_handler_test.go b/wshandler/api_handler_test.go new file mode 100644 index 0000000..fb0d04d --- /dev/null +++ b/wshandler/api_handler_test.go @@ -0,0 +1,30 @@ +// package main ... +package wshandler + +import ( + "fmt" + "testing" +) + +type TestReceiver struct { +} + +func (tr *TestReceiver) Func1([]any) { + +} + +func (tr *TestReceiver) Func2(args []any) { + fmt.Println(args...) +} + +func TestExpTable(t *testing.T) { + // src := []any{"a", 1, false} + // payload, _ := json.Marshal(src) + + // tr := new(TestReceiver) + // receiver := MakeWebsocketApiHandler(tr, "test") + + // var con WebsocketApiBroker + // con.AddHandler(receiver) + +} diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 950da01..f6c9a7f 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -90,30 +90,11 @@ const ( CloseMessage = WebSocketMessageType(websocket.CloseMessage) PingMessage = WebSocketMessageType(websocket.PingMessage) PongMessage = WebSocketMessageType(websocket.PongMessage) - Connected = WebSocketMessageType(100) - Disconnected = WebSocketMessageType(101) ) type Sender struct { - Accid primitive.ObjectID - Alias string - disconnectedCallbacks map[string]func() -} - -func (s *Sender) RegistDisconnectedCallback(name string, f func()) (old func()) { - if s.disconnectedCallbacks == nil { - s.disconnectedCallbacks = make(map[string]func()) - } - - old = s.disconnectedCallbacks[name] - s.disconnectedCallbacks[name] = f - return -} - -func (s *Sender) PopDisconnectedCallback(name string) func() { - old := s.disconnectedCallbacks[name] - delete(s.disconnectedCallbacks, name) - return old + Accid primitive.ObjectID + Alias string } type EventReceiver interface { @@ -136,10 +117,10 @@ type WebsocketHandler struct { deliveryChan chan any localDeliveryChan chan any sendMsgChan chan send_msg_queue_elem - callReceiver EventReceiver - connWaitGroup sync.WaitGroup - receiverChain []EventReceiver - sessionConsumer session.Consumer + + wsApiBroker WebsocketApiBroker + connWaitGroup sync.WaitGroup + sessionConsumer session.Consumer } type wsConfig struct { @@ -156,13 +137,13 @@ func init() { gob.Register([]any{}) } -func NewWebsocketHandler(consumer session.Consumer) (*WebsocketHandler, error) { +func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) { var config wsConfig if err := gocommon.LoadConfig(&config); err != nil { return nil, err } - redisSync, err := gocommon.NewRedisClient(config.Redis["wshandler"]) + redisSync, err := gocommon.NewRedisClient(redisUrl) if err != nil { return nil, err } @@ -196,49 +177,11 @@ func NewWebsocketHandler(consumer session.Consumer) (*WebsocketHandler, error) { }, nil } -func (ws *WebsocketHandler) RegisterReceiver(receiver EventReceiver) { - ws.receiverChain = append(ws.receiverChain, receiver) -} - -type nilReceiver struct{} - -func (r *nilReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) { -} -func (r *nilReceiver) OnRoomCreated(name string) {} -func (r *nilReceiver) OnRoomDestroyed(name string) {} - -type chainReceiver struct { - chain []EventReceiver -} - -func (r *chainReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) { - for _, cr := range r.chain { - cr.OnClientMessageReceived(sender, messageType, body) - } -} - -func (r *chainReceiver) OnRoomCreated(name string) { - for _, cr := range r.chain { - cr.OnRoomCreated(name) - } -} - -func (r *chainReceiver) OnRoomDestroyed(name string) { - for _, cr := range r.chain { - cr.OnRoomDestroyed(name) - } +func (ws *WebsocketHandler) RegisterApiHandler(handler WebsocketApiHandler) { + ws.wsApiBroker.AddHandler(handler) } func (ws *WebsocketHandler) Start(ctx context.Context) { - chain := ws.receiverChain - if len(chain) == 0 { - ws.callReceiver = &nilReceiver{} - } else if len(chain) == 1 { - ws.callReceiver = chain[0] - } else { - ws.callReceiver = &chainReceiver{chain: ws.receiverChain} - } - ws.connWaitGroup.Add(1) go ws.mainLoop(ctx) } @@ -334,19 +277,14 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) { room = makeRoom(name, roomDestroyChan, ws.sendMsgChan) rooms[name] = room room.start(ctx) - go ws.callReceiver.OnRoomCreated(name) + //go ws.callReceiver.OnRoomCreated(name) } return room } defer func() { for _, conn := range entireConns { - var roomnames []string - for _, room := range conn.joinedRooms { - roomnames = append(roomnames, room.name) - } - bt, _ := json.Marshal(roomnames) - ws.callReceiver.OnClientMessageReceived(conn.sender, Disconnected, bytes.NewBuffer(bt)) + ws.wsApiBroker.Call(conn.sender, ClientDisconnected, nil) conn.Close() } }() @@ -442,7 +380,7 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) { case destroyedRoom := <-roomDestroyChan: delete(rooms, destroyedRoom) - go ws.callReceiver.OnRoomDestroyed(destroyedRoom) + //go ws.callReceiver.OnRoomDestroyed(destroyedRoom) case usermsg := <-ws.localDeliveryChan: // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 @@ -509,18 +447,14 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) { case c := <-ws.connInOutChan: if c.Conn == nil { delete(entireConns, c.sender.Accid.Hex()) - var roomnames []string for _, room := range c.joinedRooms { - roomnames = append(roomnames, room.name) room.out(c) } c.joinedRooms = nil - - bt, _ := json.Marshal(roomnames) - go ws.callReceiver.OnClientMessageReceived(c.sender, Disconnected, bytes.NewBuffer(bt)) + go ws.wsApiBroker.Call(c.sender, ClientDisconnected, nil) } else { entireConns[c.sender.Accid.Hex()] = c - go ws.callReceiver.OnClientMessageReceived(c.sender, Connected, nil) + go ws.wsApiBroker.Call(c.sender, ClientConnected, nil) } } } @@ -549,16 +483,12 @@ func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.Ob break } - if messageType == websocket.TextMessage { - // 유저가 직접 보낸 메시지 - ws.callReceiver.OnClientMessageReceived(c.sender, TextMessage, r) - } else if messageType == websocket.BinaryMessage { - ws.callReceiver.OnClientMessageReceived(c.sender, BinaryMessage, r) - } - } - if c.sender.disconnectedCallbacks != nil { - for _, f := range c.sender.disconnectedCallbacks { - f() + if messageType == websocket.BinaryMessage { + var size [1]byte + r.Read(size[:]) + cmd := make([]byte, size[0]) + r.Read(cmd) + ws.wsApiBroker.Call(newconn.sender, string(cmd), r) } } ws.connWaitGroup.Done()