From 90d0fd319d8c1ab0d18d8d22f04788ea62868f34 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 27 Jul 2023 17:45:51 +0900 Subject: [PATCH] =?UTF-8?q?=EC=B1=84=ED=8C=85=20=EC=B1=84=EB=84=90=20?= =?UTF-8?q?=EC=9E=85=EC=9E=A5=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config_template.json | 19 +++- core/config.json | 1 - core/group.go | 2 +- core/group_chat.go | 206 ++++++++++++++++++++++++++++++++++++++++++- core/group_party.go | 32 +++---- core/tavern.go | 9 +- core/tavern_test.go | 38 +++++++- 7 files changed, 280 insertions(+), 27 deletions(-) delete mode 100644 core/config.json diff --git a/config_template.json b/config_template.json index 602ecc9..174f78a 100644 --- a/config_template.json +++ b/config_template.json @@ -31,7 +31,24 @@ "invite_ttl": 30 }, "chat" : { - "transient" : true + "default_capacity" : 1000, + "channels" : { + "bazzar-1" : { + "name" : "FText(bazzar-1)" + }, + "bazzar-2" : { + "name" : "FText(bazzar-2)" + }, + "bazzar-3" : { + "name" : "FText(bazzar-3)" + }, + "bazzar-4" : { + "name" : "FText(bazzar-4)" + }, + "bazzar-5" : { + "name" : "FText(bazzar-5)" + } + } } } } \ No newline at end of file diff --git a/core/config.json b/core/config.json deleted file mode 100644 index 9e26dfe..0000000 --- a/core/config.json +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/core/group.go b/core/group.go index 6de00eb..39a0200 100644 --- a/core/group.go +++ b/core/group.go @@ -51,5 +51,5 @@ func (afc *apiFuncsContainer) call(fn string, w http.ResponseWriter, r *http.Req type configDocument map[string]any type group interface { Initialize(*subTavern, configDocument) error - ClientMessageReceved(*wshandler.Sender, wshandler.WebSocketMessageType, any) + ClientMessageReceived(*wshandler.Sender, wshandler.WebSocketMessageType, any) } diff --git a/core/group_chat.go b/core/group_chat.go index d938bbe..f659eaa 100644 --- a/core/group_chat.go +++ b/core/group_chat.go @@ -1,8 +1,12 @@ package core import ( + "encoding/json" + "net/http" "reflect" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" ) @@ -10,13 +14,209 @@ func init() { groupTypeContainer()["chat"] = reflect.TypeOf(&groupChat{}) } -type groupChat struct { +type channelID = string +type chatConfig struct { + DefaultCapacity int64 `json:"default_capacity"` + Channels map[string]map[string]any `json:"channels"` } -func (gc *groupChat) Initialize(*subTavern, configDocument) error { +type groupChat struct { + chatConfig + rh *gocommon.RedisonHandler + incSizeScript string + decSizeScript string + enterRoom func(channelID, accountID) + leaveRoom func(channelID, accountID) + sendUpstreamMessage func(msg *wshandler.UpstreamMessage) +} + +var increaseSizeScript = ` + local cap = redis.call("HGET", KEYS[1], "capacity") + local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1) + + local newsize = redis.call("HINCRBY", KEYS[1], "size", 1) + if tonumber(cap) < newsize then + redis.call("HINCRBY", KEYS[1], "size", -1) + return {err = "channel is full"} + end + + redis.call("HSET", "_m_"..KEYS[1], KEYS[2], ARGV[1]) + return {newsize, newseq} +` +var decreaseSizeScript = ` + local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1) + local newsize = redis.call("HINCRBY", KEYS[1], "size", -1) + redis.call("HDEL", "_m_"..KEYS[1], KEYS[2]) + + return {newsize, newseq} +` + +func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error { + incScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), increaseSizeScript).Result() + if err != nil { + return err + } + + decScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), decreaseSizeScript).Result() + if err != nil { + return err + } + // newsize, err := sub.redisClient.EvalSha(sub.redisClient.Context(), incScript, []string{"myhash", "alias"}, "accid").Result() + // if err != nil { + // return err + // } + // logger.Println(newsize.([]any)) + // newsize, err = sub.redisClient.EvalSha(sub.redisClient.Context(), decScript, []string{"myhash", "alias"}).Result() + // if err != nil { + // return err + // } + // logger.Println(newsize.([]any)) + + rem, _ := json.Marshal(cfg) + if err = json.Unmarshal(rem, &gc.chatConfig); err != nil { + return err + } + + gc.enterRoom = func(chanid channelID, accid accountID) { + sub.wsh.EnterRoom(sub.region, string(chanid), accid) + } + gc.leaveRoom = func(chanid channelID, accid accountID) { + sub.wsh.LeaveRoom(sub.region, string(chanid), accid) + } + gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) { + sub.wsh.SendUpstreamMessage(sub.region, msg) + } + + gc.rh = gocommon.NewRedisonHandler(sub.redisClient.Context(), sub.redisClient) + gc.incSizeScript = incScript + gc.decSizeScript = decScript + + sub.apiFuncs.registApiFunction("CreateChattingChannel", gc.CreateChattingChannel) + sub.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels) + + for name, cfg := range gc.chatConfig.Channels { + if _, ok := cfg["capacity"]; !ok { + cfg["capacity"] = gc.chatConfig.DefaultCapacity + } + cfg["key"] = name + + _, err := gc.rh.HMSet(gc.rh.Context(), name, cfg).Result() + if err != nil { + return err + } + } + return nil } -func (gc *groupChat) ClientMessageReceved(*wshandler.Sender, wshandler.WebSocketMessageType, any) { +func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) { + if mt == wshandler.Disconnected { + rooms := message.([]string) + for _, chanid := range rooms { + sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result() + if err == nil { + gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + chanid, + Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]}, + Tag: []string{"ChattingChannelProperties"}, + }) + } + } + } else if mt == wshandler.BinaryMessage { + commandline := message.([]any) + cmd := commandline[0].(string) + args := commandline[1:] + switch cmd { + case "EnterChattingChannel": + chanid := args[0].(string) + sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.incSizeScript, []string{chanid, sender.Alias}, sender.Accid.Hex()).Result() + if err == nil { + gc.enterRoom(chanid, sender.Accid) + gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + chanid, + Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]}, + Tag: []string{"ChattingChannelProperties"}, + }) + } else { + // 입장 실패 알림 + gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + sender.Accid.Hex(), + Body: map[string]string{"id": chanid, "err": err.Error()}, + Tag: []string{"EnterChattingChannelFailed"}, + }) + } + case "LeaveChattingChannel": + chanid := args[0].(string) + gc.leaveRoom(chanid, sender.Accid) + sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result() + if err == nil { + gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + chanid, + Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]}, + Tag: []string{"ChattingChannelProperties"}, + }) + } + + case "TextMessage": + chanid := args[0].(string) + msg := args[1].(string) + gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + chanid, + Body: map[string]any{"sender": sender.Alias, "msg": msg}, + Tag: []string{"TextMessage"}, + }) + } + } +} + +func (gc *groupChat) CreateChattingChannel(w http.ResponseWriter, r *http.Request) { + chanstr, _ := gocommon.ReadStringFormValue(r.Form, "name") + if len(chanstr) == 0 { + logger.Println("CreateChattingChannel failed. name is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + capacity, _ := gocommon.ReadIntegerFormValue(r.Form, "capacity") + if capacity == 0 { + capacity = gc.chatConfig.DefaultCapacity + } + + _, err := gc.rh.HSetNX(gc.rh.Context(), chanstr, "capacity", capacity).Result() + if err != nil { + logger.Println("CreateChattingChannel failed. HSetNX returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) { + prefix, _ := gocommon.ReadStringFormValue(r.Form, "prefix") + if len(prefix) == 0 { + logger.Println("FetchChattingChannel failed. prefix is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + keys, err := gc.rh.Keys(gc.rh.Context(), prefix+"*").Result() + if err != nil { + logger.Println("FetchChattingChannel failed. Keys return err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + var channels []map[string]string + for _, key := range keys { + onechan, err := gc.rh.HGetAll(gc.rh.Context(), key).Result() + if err != nil { + logger.Println("FetchChattingChannel failed. HGetAll return err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + channels = append(channels, onechan) + } + + enc := json.NewEncoder(w) + enc.Encode(channels) } diff --git a/core/group_party.go b/core/group_party.go index ec493fd..e088081 100644 --- a/core/group_party.go +++ b/core/group_party.go @@ -221,8 +221,8 @@ type partyConfig struct { type groupParty struct { partyConfig sendUpstreamMessage func(*wshandler.UpstreamMessage) - sendEnterRoomMessage func(groupID, accountID) - sendLeaveRoomMessage func(groupID, accountID) + enterRoom func(groupID, accountID) + leaveRoom func(groupID, accountID) rh *gocommon.RedisonHandler } @@ -237,10 +237,10 @@ func (gp *groupParty) Initialize(sub *subTavern, cfg configDocument) error { gp.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) { sub.wsh.SendUpstreamMessage(sub.region, msg) } - gp.sendEnterRoomMessage = func(gid groupID, accid accountID) { + gp.enterRoom = func(gid groupID, accid accountID) { sub.wsh.EnterRoom(sub.region, gid.Hex(), accid) } - gp.sendLeaveRoomMessage = func(gid groupID, accid accountID) { + gp.leaveRoom = func(gid groupID, accid accountID) { sub.wsh.LeaveRoom(sub.region, gid.Hex(), accid) } @@ -309,7 +309,7 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) { Tag: []string{"MemberDocFull"}, }) - gp.sendEnterRoomMessage(gid, mid) + gp.enterRoom(gid, mid) // 새 멤버에 그룹 전체를 알림 gp.sendUpstreamMessage(&wshandler.UpstreamMessage{ @@ -398,7 +398,7 @@ func (gp *groupParty) InviteToParty(w http.ResponseWriter, r *http.Request) { return } // 내가 wshandler room에 입장 - gp.sendEnterRoomMessage(gid, mid) + gp.enterRoom(gid, mid) gp.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), @@ -469,7 +469,7 @@ func (gp *groupParty) AcceptPartyInvitation(w http.ResponseWriter, r *http.Reque Tag: []string{"MemberDocFull"}, }) - gp.sendEnterRoomMessage(gid, mid) + gp.enterRoom(gid, mid) // 새 멤버에 그룹 전체를 알림 gp.sendUpstreamMessage(&wshandler.UpstreamMessage{ @@ -503,8 +503,13 @@ func (gp *groupParty) QueryPartyMemberState(w http.ResponseWriter, r *http.Reque return } - state, _ := gp.rh.HGet(gp.rh.Context(), mid, "party_state").Result() - w.Write([]byte(state)) + states, _ := gp.rh.HMGet(gp.rh.Context(), mid, "party_state", "_ts").Result() + if states[0] != nil && len(states[0].(string)) > 0 { + w.Write([]byte(states[0].(string))) + } else if states[1] != nil && len(states[1].(string)) > 0 { + w.Write([]byte("connected")) + } + } // LeaveParty : 그룹에서 나감 or 내보냄 @@ -541,7 +546,7 @@ func (gp *groupParty) LeaveParty(w http.ResponseWriter, r *http.Request) { Body: bson.M{"gid": gid}, Tag: []string{"GroupDocFull", gid.Hex()}, }) - gp.sendLeaveRoomMessage(gid, mid) + gp.leaveRoom(gid, mid) } func (gp *groupParty) updateMemberDocument(gid groupID, mid accountID, doc bson.M) error { @@ -731,15 +736,12 @@ func (gp *groupParty) memberDisconnected(room string, mid primitive.ObjectID) { } } -func (gp *groupParty) ClientMessageReceved(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) { - if mt == wshandler.Connected { - gp.rh.HSet(gp.rh.Context(), sender.Accid.Hex(), "party_state", "connected").Result() - } else if mt == wshandler.Disconnected { +func (gp *groupParty) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) { + if mt == wshandler.Disconnected { rooms := message.([]string) for _, roomname := range rooms { gp.memberDisconnected(roomname, sender.Accid) } - gp.rh.HDel(gp.rh.Context(), sender.Accid.Hex(), "party_state").Result() } else if mt == wshandler.BinaryMessage { commandline := message.([]any) cmd := commandline[0].(string) diff --git a/core/tavern.go b/core/tavern.go index 556fa70..1f4ae3a 100644 --- a/core/tavern.go +++ b/core/tavern.go @@ -204,16 +204,15 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT if messageType == wshandler.Connected { logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex()) sub.redisClient.HSet(sub.redisClient.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result() - for _, gt := range sub.groups { - gt.ClientMessageReceved(sender, messageType, nil) + gt.ClientMessageReceived(sender, messageType, nil) } } else if messageType == wshandler.Disconnected { var rooms []string dec := json.NewDecoder(body) if err := dec.Decode(&rooms); err == nil { for _, gt := range sub.groups { - gt.ClientMessageReceved(sender, messageType, rooms) + gt.ClientMessageReceived(sender, messageType, rooms) } } sub.redisClient.Del(sub.redisClient.Context(), sender.Accid.Hex()).Result() @@ -233,7 +232,7 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT default: for _, gt := range sub.groups { - gt.ClientMessageReceved(sender, messageType, commandline) + gt.ClientMessageReceived(sender, messageType, commandline) } } } @@ -250,7 +249,7 @@ func (sub *subTavern) OnRoomCreated(region, name string) { func (sub *subTavern) OnRoomDestroyed(region, name string) { _, err := sub.redisClient.Expire(context.Background(), name, 3600*time.Second).Result() if err != nil { - logger.Println("OnRoomDestroyed Persist failed :", err) + logger.Println("OnRoomDestroyed Expire failed :", err) } } diff --git a/core/tavern_test.go b/core/tavern_test.go index 663360c..2401990 100644 --- a/core/tavern_test.go +++ b/core/tavern_test.go @@ -3,6 +3,7 @@ package core import ( "context" + "encoding/binary" "fmt" "testing" "time" @@ -30,11 +31,46 @@ func TestPubSub(t *testing.T) { msg, err := pubsub.ReceiveMessage(context.Background()) fmt.Println(msg.Payload, err) } +func makeHash(chanName string, index uint32) string { + for len(chanName) < 12 { + chanName += chanName + } + left := chanName[:6] + right := chanName[len(chanName)-6:] + base := []byte(left + right) + for i := 0; i < 12; i++ { + base[i] += base[12-i-1] + } + + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, index) + for i, c := range bs { + base[i] ^= c + } + var gid primitive.ObjectID + copy(gid[:], base) + + return gid.Hex() +} + +func TestNameHash(t *testing.T) { + for i := 0; i < 10; i++ { + makeHash("Urud", uint32(i)) + fmt.Printf("Urud:%d - %s\n", i, makeHash("Urud", uint32(i))) + makeHash("Sheldon", uint32(i)) + fmt.Printf("Sheldon:%d - %s\n", i, makeHash("Sheldon", uint32(i))) + } +} func TestReJSON(t *testing.T) { rc := redis.NewClient(&redis.Options{Addr: "192.168.8.94:6380"}) rh := gocommon.NewRedisonHandler(context.Background(), rc) + success, err := rc.HSetNX(context.Background(), "setnxtest", "cap", 100).Result() + fmt.Println(success, err) + success, err = rc.HSetNX(context.Background(), "setnxtest", "cap", 100).Result() + fmt.Println(success, err) + testDoc := map[string]any{ "members": map[string]any{ "mid2": map[string]any{ @@ -64,7 +100,7 @@ func TestReJSON(t *testing.T) { logger.Println(rh.JSONGetInt64("jsontest", "$.members..exp")) logger.Println(rh.JSONObjKeys("jsontest", "$.members")) - err := rh.JSONMSet("jsontest", map[string]any{ + err = rh.JSONMSet("jsontest", map[string]any{ "$.members.mid1.key": "newval", "$.members.mid2.key": "newval", })