package core import ( "encoding/json" "fmt" "io" "net/http" "reflect" "strings" "time" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" ) func init() { groupTypeContainer()["chat"] = reflect.TypeOf(&groupChat{}) } type channelID = string type channelConfig struct { Capacity int64 `json:"capacity"` Size int64 `json:"size"` Key string `json:"key"` emptyJson string } type chatConfig struct { DefaultCapacity int64 `json:"default_capacity"` Channels map[string]channelConfig `json:"channels"` } type groupChat struct { chatConfig rh *gocommon.RedisonHandler enterRoom func(channelID, accountID) leaveRoom func(channelID, accountID) sendUpstreamMessage func(msg *wshandler.UpstreamMessage) } var accidHex func(primitive.ObjectID) string var accidstrHex func(string) string func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error { rem, _ := json.Marshal(cfg) if err := json.Unmarshal(rem, &gc.chatConfig); err != nil { return err } gc.enterRoom = func(chanid channelID, accid accountID) { sub.wsh.EnterRoom(sub.region, string(chanid), accid) } gc.leaveRoom = func(chanid channelID, accid accountID) { sub.wsh.LeaveRoom(sub.region, string(chanid), accid) } gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) { sub.wsh.SendUpstreamMessage(sub.region, msg) } gc.rh = sub.redison sub.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels) sub.apiFuncs.registApiFunction("BroadcastMessageOnChannel", gc.BroadcastMessageOnChannel) sub.apiFuncs.registApiFunction("QueryPlayerChattingChannel", gc.QueryPlayerChattingChannel) sub.apiFuncs.registApiFunction("SendMessageOnChannel", gc.SendMessageOnChannel) for name, cfg := range gc.chatConfig.Channels { if cfg.Capacity == 0 { cfg.Capacity = gc.chatConfig.DefaultCapacity } cfg.Key = name cfg.Size = 0 jm, _ := json.Marshal(cfg) cfg.emptyJson = fmt.Sprintf("[%s]", string(jm)) _, err := gc.rh.JSONSet(name, "$", cfg) if *devflag && err != nil { gc.rh.Del(gc.rh.Context(), name).Result() _, err = gc.rh.JSONSet(name, "$", cfg) } if err != nil { return err } } ts := fmt.Sprintf("%x-", time.Now().Unix()) if *devflag { accidHex = func(accid primitive.ObjectID) string { return ts + accid.Hex() } accidstrHex = func(accid string) string { return ts + accid } } else { accidHex = func(accid primitive.ObjectID) string { return accid.Hex() } accidstrHex = func(accid string) string { return accid } } return nil } func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) { if mt == wshandler.Disconnected { if _, err := gc.rh.Del(gc.rh.Context(), accidHex(sender.Accid)).Result(); err != nil { logger.Println(err) } } else if mt == wshandler.BinaryMessage { commandline := message.([]any) cmd := commandline[0].(string) args := commandline[1:] switch cmd { case "EnterPublicChannel": chanid := args[0].(string) if cfg, ok := gc.chatConfig.Channels[chanid]; ok { size, err := gc.rh.JSONGetInt64(chanid, "$.size") if err != nil || len(size) == 0 { logger.Println("JSONGetInt64 failed :", chanid, err) } else if size[0] < cfg.Capacity { // 입장 newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1) if err == nil { gc.enterRoom(chanid, sender.Accid) sender.RegistDisconnectedCallback(chanid, func() { size, err := gc.rh.JSONNumIncrBy(chanid, "$.size", -1) if err == nil { gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + chanid, Body: map[string]any{"size": size}, Tag: []string{"ChattingChannelProperties"}, }) } }) gc.rh.HSet(gc.rh.Context(), accidHex(sender.Accid), "cc_pub", chanid) gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + chanid, Body: map[string]any{"size": newsize[0]}, Tag: []string{"ChattingChannelProperties"}, }) } } else { // 풀방 logger.Println("chatting channel is full :", chanid, size) } } else { logger.Println("chatting channel not valid :", chanid) } case "LeavePublicChannel": chanid := args[0].(string) gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_pub") gc.leaveRoom(chanid, sender.Accid) if f := sender.PopDisconnectedCallback(chanid); f != nil { f() } case "TextMessage": chanid := args[0].(string) msg := args[1].(string) gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + chanid, Body: map[string]any{"sender": sender.Alias, "msg": msg}, Tag: []string{"TextMessage"}, }) case "EnterPrivateChannel": typename := args[0].(string) channel := args[1].(string) var reason string if len(args) > 2 { reason = args[2].(string) } if len(reason) > 0 { // 수락 ok, err := gc.rh.HSetNX(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename, channel).Result() if err != nil || !ok { // 이미 다른 private channel 참여 중 logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, sender.Accid.Hex(), typename, channel) return } gc.enterRoom(channel, sender.Accid) sender.RegistDisconnectedCallback(channel, func() { gc.rh.JSONDel(channel, "$."+sender.Accid.Hex()) cnt, _ := gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename).Result() if cnt > 0 { gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + channel, Body: map[string]any{"sender": sender.Alias, "typename": typename}, Tag: []string{"LeavePrivateChannel"}, }) } }) } gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + channel, Body: map[string]any{ "sender": sender.Alias, "msg": reason, "typename": typename, }, Tag: []string{"EnterPrivateChannel"}, }) case "LeavePrivateChannel": channel := args[1].(string) gc.leaveRoom(channel, sender.Accid) if f := sender.PopDisconnectedCallback(channel); f != nil { f() } } } } func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) { prefix, _ := gocommon.ReadStringFormValue(r.Form, "prefix") if len(prefix) == 0 { logger.Println("FetchChattingChannel failed. prefix is missing") w.WriteHeader(http.StatusBadRequest) return } var rows []string for name, cfg := range gc.chatConfig.Channels { if len(prefix) > 0 { if !strings.HasPrefix(name, prefix) { continue } } onechan, err := gc.rh.JSONGet(name, "$") if err != nil && err != redis.Nil { logger.Println("FetchChattingChannel failed. HGetAll return err :", err) w.WriteHeader(http.StatusInternalServerError) return } if err == redis.Nil || onechan == nil { rows = append(rows, cfg.emptyJson) } else { rows = append(rows, onechan.(string)) } } if len(rows) == 0 { w.Write([]byte("[]")) } else if len(rows) == 1 { w.Write([]byte(rows[0])) } else { first := rows[0] w.Write([]byte(first[:len(first)-1])) for i := 1; i < len(rows); i++ { mid := rows[i] w.Write([]byte(",")) w.Write([]byte(mid[1 : len(mid)-1])) } w.Write([]byte("]")) } } func (gc *groupChat) QueryPlayerChattingChannel(w http.ResponseWriter, r *http.Request) { accid, _ := gocommon.ReadStringFormValue(r.Form, "accid") typename, _ := gocommon.ReadStringFormValue(r.Form, "typename") var fields []string if len(typename) == 0 { fields = []string{"cc_pub"} } else { fields = []string{"cc_pub", "cc_" + typename} } chans, err := gc.rh.HMGet(gc.rh.Context(), accidstrHex(accid), fields...).Result() if err != nil { w.WriteHeader(http.StatusInternalServerError) return } output := make(map[string]string) if len(chans) > 0 && chans[0] != nil { output["public"] = chans[0].(string) } if len(chans) > 1 && chans[1] != nil { output[typename] = chans[1].(string) } enc := json.NewEncoder(w) enc.Encode(output) } func (gc *groupChat) SendMessageOnChannel(w http.ResponseWriter, r *http.Request) { channel, _ := gocommon.ReadStringFormValue(r.Form, "channel") target, _ := gocommon.ReadStringFormValue(r.Form, "target") tag, _ := gocommon.ReadStringFormValue(r.Form, "tag") if len(channel) == 0 || len(target) == 0 || len(tag) == 0 { logger.Println("SendMessageOnChannel failed. channel or target or tag is empty") w.WriteHeader(http.StatusBadRequest) return } var doc map[string]any bt, err := io.ReadAll(r.Body) if err != nil { logger.Println("SendMessageOnChannel failed. io.ReadAll returns err :", err) w.WriteHeader(http.StatusInternalServerError) return } if len(bt) > 0 { if err := bson.Unmarshal(bt, &doc); err != nil { logger.Println("SendMessageOnChannel failed. decode returns err :", err) w.WriteHeader(http.StatusBadRequest) return } } gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: fmt.Sprintf("%s@%s", target, channel), Body: doc, Tag: []string{tag}, }) } func (gc *groupChat) BroadcastMessageOnChannel(w http.ResponseWriter, r *http.Request) { nickname, _ := gocommon.ReadStringFormValue(r.Form, "nickname") channel, _ := gocommon.ReadStringFormValue(r.Form, "channel") tag, _ := gocommon.ReadStringFormValue(r.Form, "tag") text, _ := io.ReadAll(r.Body) if len(tag) > 0 { var doc map[string]any if err := bson.Unmarshal(text, &doc); err == nil { gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + channel, Body: doc, Tag: []string{tag}, }) } else { logger.Println("BroadcastMessageOnChannel failed :", err) } } else { gc.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + channel, Body: map[string]any{"sender": nickname, "msg": string(text)}, Tag: []string{"TextMessage"}, }) } }