chat document update

This commit is contained in:
2023-08-12 15:26:32 +09:00
parent 5e953d6131
commit a08353a920
4 changed files with 312 additions and 123 deletions

View File

@ -4,6 +4,7 @@ import (
"net/http"
"reflect"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
@ -45,6 +46,8 @@ func (afc *apiFuncsContainer) call(fn string, w http.ResponseWriter, r *http.Req
f := afc.normfuncs[fn]
if f != nil {
f(w, r)
} else {
logger.Println("api func is missing :", fn)
}
}

View File

@ -2,10 +2,14 @@ package core
import (
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
@ -24,61 +28,17 @@ type chatConfig struct {
type groupChat struct {
chatConfig
rh *gocommon.RedisonHandler
incSizeScript string
decSizeScript string
enterRoom func(channelID, accountID)
leaveRoom func(channelID, accountID)
sendUpstreamMessage func(msg *wshandler.UpstreamMessage)
}
var increaseSizeScript = `
local cap = redis.call("HGET", KEYS[1], "capacity")
local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1)
local newsize = redis.call("HINCRBY", KEYS[1], "size", 1)
if tonumber(cap) < newsize then
redis.call("HINCRBY", KEYS[1], "size", -1)
return {err = "channel is full"}
end
redis.call("HSET", "_m_"..KEYS[1], KEYS[2], ARGV[1])
return {newsize, newseq}
`
var decreaseSizeScript = `
local exists = redis.call("EXISTS", "_m_"..KEYS[1])
if exists == 0 then
return {err = "not target"}
end
local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1)
local newsize = redis.call("HINCRBY", KEYS[1], "size", -1)
redis.call("HDEL", "_m_"..KEYS[1], KEYS[2])
return {newsize, newseq}
`
var accidHex func(primitive.ObjectID) string
var accidstrHex func(string) string
func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error {
incScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), increaseSizeScript).Result()
if err != nil {
return err
}
decScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), decreaseSizeScript).Result()
if err != nil {
return err
}
// newsize, err := sub.redisClient.EvalSha(sub.redisClient.Context(), incScript, []string{"myhash", "alias"}, "accid").Result()
// if err != nil {
// return err
// }
// logger.Println(newsize.([]any))
// newsize, err = sub.redisClient.EvalSha(sub.redisClient.Context(), decScript, []string{"myhash", "alias"}).Result()
// if err != nil {
// return err
// }
// logger.Println(newsize.([]any))
rem, _ := json.Marshal(cfg)
if err = json.Unmarshal(rem, &gc.chatConfig); err != nil {
if err := json.Unmarshal(rem, &gc.chatConfig); err != nil {
return err
}
@ -92,26 +52,49 @@ func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error {
sub.wsh.SendUpstreamMessage(sub.region, msg)
}
gc.rh = gocommon.NewRedisonHandler(sub.redisClient.Context(), sub.redisClient)
gc.incSizeScript = incScript
gc.decSizeScript = decScript
gc.rh = sub.redison
sub.apiFuncs.registApiFunction("CreateChattingChannel", gc.CreateChattingChannel)
sub.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels)
sub.apiFuncs.registApiFunction("BroadcastMessageOnChannel", gc.BroadcastMessageOnChannel)
sub.apiFuncs.registApiFunction("QueryPlayerChattingChannel", gc.QueryPlayerChattingChannel)
sub.apiFuncs.registApiFunction("SendMessageOnChannel", gc.SendMessageOnChannel)
sub.apiFuncs.registApiFunction("UpdateChannelDocument", gc.UpdateChannelDocument)
for name, cfg := range gc.chatConfig.Channels {
if _, ok := cfg["capacity"]; !ok {
cfg["capacity"] = gc.chatConfig.DefaultCapacity
} else {
cfg["capacity"] = int64(cfg["capacity"].(float64))
}
cfg["key"] = name
cfg["size"] = int32(0)
_, err := gc.rh.JSONSet(name, "$", cfg)
if *devflag && err != nil {
gc.rh.Del(gc.rh.Context(), name).Result()
_, err = gc.rh.JSONSet(name, "$", cfg)
}
_, err := gc.rh.HMSet(gc.rh.Context(), name, cfg).Result()
if err != nil {
return err
}
}
ts := fmt.Sprintf("%x-", time.Now().Unix())
if *devflag {
accidHex = func(accid primitive.ObjectID) string {
return ts + accid.Hex()
}
accidstrHex = func(accid string) string {
return ts + accid
}
} else {
accidHex = func(accid primitive.ObjectID) string {
return accid.Hex()
}
accidstrHex = func(accid string) string {
return accid
}
}
return nil
}
@ -119,47 +102,65 @@ func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandle
if mt == wshandler.Disconnected {
rooms := message.([]string)
for _, chanid := range rooms {
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result()
if err == nil {
if _, ok := gc.chatConfig.Channels[chanid]; !ok {
continue
}
size, err := gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
if err != nil {
logger.Println("JSONMGet failed :", err)
continue
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Body: map[string]any{"size": size},
Tag: []string{"ChattingChannelProperties"},
})
}
if _, err := gc.rh.Del(gc.rh.Context(), accidHex(sender.Accid)).Result(); err != nil {
logger.Println(err)
}
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "EnterChattingChannel":
case "EnterPublicChannel":
chanid := args[0].(string)
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.incSizeScript, []string{chanid, sender.Alias}, sender.Accid.Hex()).Result()
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
size, err := gc.rh.JSONGetInt64(chanid, "$.size")
if err != nil || len(size) == 0 {
logger.Println("JSONGetInt64 failed :", chanid, err)
} else if size[0] < cfg["capacity"].(int64) {
// 입장
newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
if err == nil {
gc.enterRoom(chanid, sender.Accid)
gc.rh.HSet(gc.rh.Context(), accidHex(sender.Accid), "cc_pub", chanid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Body: map[string]any{"size": newsize[0]},
Tag: []string{"ChattingChannelProperties"},
})
}
} else {
// 입장 실패 알림
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + sender.Accid.Hex(),
Body: map[string]string{"id": chanid, "err": err.Error()},
Tag: []string{"EnterChattingChannelFailed"},
})
// 풀방
logger.Println("chatting channel is full :", chanid, size)
}
} else {
logger.Println("chatting channel not valid :", chanid)
}
case "LeaveChattingChannel":
case "LeavePublicChannel":
chanid := args[0].(string)
gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_pub")
gc.leaveRoom(chanid, sender.Accid)
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result()
newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
if err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Body: map[string]any{"size": newsize[0]},
Tag: []string{"ChattingChannelProperties"},
})
}
@ -172,28 +173,49 @@ func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandle
Body: map[string]any{"sender": sender.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
}
}
case "EnterPrivateChannel":
typename := args[0].(string)
channel := args[1].(string)
var reason string
if len(args) > 2 {
reason = args[2].(string)
}
func (gc *groupChat) CreateChattingChannel(w http.ResponseWriter, r *http.Request) {
chanstr, _ := gocommon.ReadStringFormValue(r.Form, "name")
if len(chanstr) == 0 {
logger.Println("CreateChattingChannel failed. name is missing")
w.WriteHeader(http.StatusBadRequest)
if len(reason) > 0 {
// 수락
ok, err := gc.rh.HSetNX(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename, channel).Result()
if err != nil || !ok {
// 이미 다른 private channel 참여 중
logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, sender.Accid.Hex(), typename, channel)
return
}
capacity, _ := gocommon.ReadIntegerFormValue(r.Form, "capacity")
if capacity == 0 {
capacity = gc.chatConfig.DefaultCapacity
gc.enterRoom(channel, sender.Accid)
}
_, err := gc.rh.HSetNX(gc.rh.Context(), chanstr, "capacity", capacity).Result()
if err != nil {
logger.Println("CreateChattingChannel failed. HSetNX returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{
"sender": sender.Alias,
"msg": reason,
"typename": typename,
},
Tag: []string{"EnterPrivateChannel"},
})
case "LeavePrivateChannel":
typename := args[0].(string)
channel := args[1].(string)
cnt, _ := gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename).Result()
if cnt > 0 {
gc.leaveRoom(channel, sender.Accid)
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{"sender": sender.Alias, "typename": typename},
Tag: []string{"LeavePrivateChannel"},
})
}
}
}
@ -212,29 +234,170 @@ func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Reques
return
}
var channels []map[string]string
var rows []string
for _, key := range keys {
onechan, err := gc.rh.HGetAll(gc.rh.Context(), key).Result()
onechan, err := gc.rh.JSONGet(key, "$")
if err != nil {
logger.Println("FetchChattingChannel failed. HGetAll return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
channels = append(channels, onechan)
row := onechan.(string)
rows = append(rows, row)
}
if len(rows) == 0 {
w.Write([]byte("[]"))
} else if len(rows) == 1 {
w.Write([]byte(rows[0]))
} else {
first := rows[0]
w.Write([]byte(first[:len(first)-1]))
for i := 1; i < len(rows); i++ {
mid := rows[i]
w.Write([]byte(","))
w.Write([]byte(mid[1 : len(mid)-1]))
}
w.Write([]byte("]"))
}
}
func (gc *groupChat) QueryPlayerChattingChannel(w http.ResponseWriter, r *http.Request) {
accid, _ := gocommon.ReadStringFormValue(r.Form, "accid")
typename, _ := gocommon.ReadStringFormValue(r.Form, "typename")
var fields []string
if len(typename) == 0 {
fields = []string{"cc_pub"}
} else {
fields = []string{"cc_pub", "cc_" + typename}
}
chans, err := gc.rh.HMGet(gc.rh.Context(), accidstrHex(accid), fields...).Result()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
output := make(map[string]string)
if len(chans) > 0 && chans[0] != nil {
output["public"] = chans[0].(string)
}
if len(chans) > 1 && chans[1] != nil {
output[typename] = chans[1].(string)
}
enc := json.NewEncoder(w)
enc.Encode(channels)
enc.Encode(output)
}
func (gc *groupChat) UpdateChannelDocument(w http.ResponseWriter, r *http.Request) {
channel, _ := gocommon.ReadStringFormValue(r.Form, "channel")
key, _ := gocommon.ReadStringFormValue(r.Form, "key")
ret, _ := gocommon.ReadStringFormValue(r.Form, "return")
if len(channel) == 0 || len(key) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
bt, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
var doc map[string]any
if err := bson.Unmarshal(bt, &doc); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if ret == "before" {
before, err2 := gc.rh.JSONGet(channel, "$")
if err2 != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
if err == nil {
w.Write([]byte(before.(string)))
}
}()
} else if ret == "after" {
defer func() {
if err == nil {
after, err2 := gc.rh.JSONGet(channel, "$")
if err2 != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.Write([]byte(after.(string)))
}
}
}()
}
_, err = gc.rh.JSONSet(channel, "$."+key, doc)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (gc *groupChat) SendMessageOnChannel(w http.ResponseWriter, r *http.Request) {
channel, _ := gocommon.ReadStringFormValue(r.Form, "channel")
target, _ := gocommon.ReadStringFormValue(r.Form, "target")
tag, _ := gocommon.ReadStringFormValue(r.Form, "tag")
if len(channel) == 0 || len(target) == 0 || len(tag) == 0 {
logger.Println("SendMessageOnChannel failed. channel or target or tag is empty")
w.WriteHeader(http.StatusBadRequest)
return
}
var doc map[string]any
bt, err := io.ReadAll(r.Body)
if err != nil {
logger.Println("SendMessageOnChannel failed. io.ReadAll returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(bt) > 0 {
if err := bson.Unmarshal(bt, &doc); err != nil {
logger.Println("SendMessageOnChannel failed. decode returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: fmt.Sprintf("%s@%s", target, channel),
Body: doc,
Tag: []string{tag},
})
}
func (gc *groupChat) BroadcastMessageOnChannel(w http.ResponseWriter, r *http.Request) {
nickname, _ := gocommon.ReadStringFormValue(r.Form, "nickname")
channel, _ := gocommon.ReadStringFormValue(r.Form, "channel")
tag, _ := gocommon.ReadStringFormValue(r.Form, "tag")
text, _ := io.ReadAll(r.Body)
if len(tag) > 0 {
var doc map[string]any
if err := bson.Unmarshal(text, &doc); err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: doc,
Tag: []string{tag},
})
} else {
logger.Println("BroadcastMessageOnChannel failed :", err)
}
} else {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{"sender": nickname, "msg": string(text)},
Tag: []string{"TextMessage"},
})
}
}

View File

@ -262,7 +262,7 @@ func (gp *groupParty) Initialize(sub *subTavern, cfg configDocument) error {
return err
}
gp.rh = gocommon.NewRedisonHandler(sub.redisClient.Context(), sub.redisClient)
gp.rh = sub.redison
gp.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
sub.wsh.SendUpstreamMessage(sub.region, msg)
}
@ -342,7 +342,7 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) {
// 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Target: mid.Hex(),
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
@ -405,7 +405,7 @@ func (gp *groupParty) InviteToParty(w http.ResponseWriter, r *http.Request) {
// 이미 초대 중이다.
// inviter한테 알려줘야 한다.
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Target: mid.Hex(),
Body: reqdoc.Invitee,
Tag: []string{"InvitationFail"},
})
@ -430,7 +430,7 @@ func (gp *groupParty) InviteToParty(w http.ResponseWriter, r *http.Request) {
gp.enterRoom(gid, mid)
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Target: mid.Hex(),
Body: gd,
Tag: []string{"GroupDocFull"},
})
@ -445,7 +445,7 @@ func (gp *groupParty) InviteToParty(w http.ResponseWriter, r *http.Request) {
// invitee에게 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + targetid.Hex(),
Target: targetid.Hex(),
Body: Invitation{
GroupID: gid,
TicketID: gd.tid(targetid),
@ -501,7 +501,7 @@ func (gp *groupParty) AcceptPartyInvitation(w http.ResponseWriter, r *http.Reque
// 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Target: mid.Hex(),
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
@ -595,7 +595,7 @@ func (gp *groupParty) LeaveParty(w http.ResponseWriter, r *http.Request) {
// mid한테는 빈 GroupDocFull을 보낸다. 그러면 지워짐
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Target: mid.Hex(),
Body: bson.M{"gid": gid},
Tag: []string{"GroupDocFull", gid.Hex()},
})

View File

@ -14,6 +14,7 @@ import (
"github.com/go-redis/redis/v8"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
@ -21,6 +22,8 @@ import (
"go.mongodb.org/mongo-driver/bson/bsonrw"
)
var devflag = flagx.Bool("dev", false, "")
const (
defaultMaxMemory = 32 << 10 // 32 KB
)
@ -65,7 +68,7 @@ func readBsonDoc(r io.Reader, src any) error {
type TavernConfig struct {
gocommon.RegionStorageConfig `json:",inline"`
Group2 map[string]configDocument `json:"tavern_group_types"`
Group map[string]configDocument `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"`
RedisURL string `json:"tavern_redis_url"`
macAddr string
@ -80,7 +83,7 @@ type Tavern struct {
type subTavern struct {
mongoClient gocommon.MongoClient
redisClient *redis.Client
redison *gocommon.RedisonHandler
wsh *wshandler.WebsocketHandler
region string
groups map[string]group
@ -147,10 +150,12 @@ func (tv *Tavern) prepare(ctx context.Context) error {
return err
}
redison := gocommon.NewRedisonHandler(redisClient.Context(), redisClient)
sub := &subTavern{
wsh: tv.wsh,
mongoClient: dbconn,
redisClient: redisClient,
redison: redison,
region: region,
apiFuncs: &apiFuncsContainer{
normfuncs: make(map[string]apiFuncType),
@ -159,7 +164,7 @@ func (tv *Tavern) prepare(ctx context.Context) error {
}
groups := make(map[string]group)
for typename, cfg := range config.Group2 {
for typename, cfg := range config.Group {
gtype, ok := groupTypeContainer()[typename]
if !ok {
return fmt.Errorf("%s group type is not valid", typename)
@ -203,7 +208,7 @@ func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux,
func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected {
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
sub.redisClient.HSet(sub.redisClient.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result()
sub.redison.HSet(sub.redison.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result()
for _, gt := range sub.groups {
gt.ClientMessageReceived(sender, messageType, nil)
}
@ -215,7 +220,7 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
gt.ClientMessageReceived(sender, messageType, rooms)
}
}
sub.redisClient.Del(sub.redisClient.Context(), sender.Accid.Hex()).Result()
sub.redison.Del(sub.redison.Context(), sender.Accid.Hex()).Result()
logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
} else if messageType == wshandler.BinaryMessage {
var commandline []any
@ -240,11 +245,29 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
}
func (sub *subTavern) OnRoomCreated(region, name string) {
created, err := sub.redison.JSONSet(name, "$", map[string]any{
"_refcnt": 1,
}, gocommon.RedisonSetOptionNX)
if err != nil && !errors.Is(err, redis.Nil) {
logger.Println("OnRoomCreated JSONSet failed :", err)
return
}
if !created {
_, err = sub.redison.JSONNumIncrBy(name, "$._refcnt", 1)
if err != nil {
logger.Println("OnRoomCreated JSONSet failed :", err)
return
}
}
}
func (sub *subTavern) OnRoomDestroyed(region, name string) {
cnt, err := sub.redison.JSONNumIncrBy(name, "$._refcnt", -1)
if err != nil || len(cnt) == 0 {
logger.Println("OnRoomDestroyed JSONNumIncrBy failed :", err)
} else if cnt[0] == 0 {
sub.redison.Del(sub.redison.Context(), name)
}
}
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {