From 8dded8b90738851b2a30ac5c2cef852cdf039790 Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 19 Jul 2023 09:37:40 +0900 Subject: [PATCH] Squashed commit of the following: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit commit 8e1b232d57801c12edb94d02038b663d19c67b9b Author: mountain Date: Wed Jul 19 09:37:02 2023 +0900 InMemory 그룹을 redis로 변경 commit 01da5bb3a4b2c1857ce73d5722add85d347c976d Author: mountain Date: Tue Jul 18 01:31:39 2023 +0900 body를 marshaling하고 클라이언트에서 flatten함 commit ba61a11659eb790d98d41ce61d38dd001ccfa56c Author: mountain Date: Mon Jul 17 17:47:07 2023 +0900 gob 등록 commit 67cca13326aa517ba0de396b8d5bce736c1c86d8 Author: mountain Date: Sun Jul 16 18:41:24 2023 +0900 모듈 업데이트 commit 272c696c59652452da45e2704eabebc1e5f587a8 Author: mountain Date: Sun Jul 16 17:29:21 2023 +0900 json value 다시 되돌림 commit aa568ec3fa0016b4a1a43b29c578720ff65883c1 Author: mountain Date: Sun Jul 16 17:26:19 2023 +0900 SetOption 타입 변경 commit b9c4d8b21bbe47733f3bc061973743de06ce3b64 Author: mountain Date: Sun Jul 16 17:15:08 2023 +0900 objvalue marshalling 수정 commit 99834c146182cf51cbe4a24915f14c91208be994 Author: mountain Date: Sun Jul 16 17:01:06 2023 +0900 objlen 수정 commit 592112219ec5333031b56949d85e91ce5e46bf20 Author: mountain Date: Sun Jul 16 16:38:05 2023 +0900 gocommon 업데이트 commit 62485b6d54e7f8394272b1b80778f677b0fc5996 Author: mountain Date: Sun Jul 16 15:36:20 2023 +0900 redis json 마이그레이션 완료 commit d36dd13bb7b5c70825cae3ca5ea4d3cb62506ab6 Author: mountain Date: Sun Jul 16 02:51:41 2023 +0900 redis stack 사용 --- config_template.json | 111 ++-- core/apiimpl.go | 272 +-------- core/config.json | 1 + core/group.go | 14 +- core/group_memory.go | 1247 +++++++++++------------------------------- core/richconn.go | 97 ---- core/tavern.go | 104 ++-- core/tavern_test.go | 77 +++ go.mod | 4 +- go.sum | 54 +- main.go | 3 +- 11 files changed, 565 insertions(+), 1419 deletions(-) create mode 100644 core/config.json delete mode 100644 core/richconn.go create mode 100644 core/tavern_test.go diff --git a/config_template.json b/config_template.json index 261b84c..ef23c98 100644 --- a/config_template.json +++ b/config_template.json @@ -1,65 +1,72 @@ { - "region_storage" : { - "default" : { - "mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", - "redis" : { - "url" : "redis://192.168.8.94:6379", - "offset" : { - "cache" : 0, - "session" : 1, - "ranking" : 2 - } + "region_storage": { + "default": { + "mongo": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "redis": { + "cache": "redis://192.168.8.94:6380/0", + "session": "redis://192.168.8.94:6380/1", + "ranking": "redis://192.168.8.94:6380/2", + "wshandler": "redis://192.168.8.94:6380/3", + "tavern": "redis://192.168.8.94:6380/4" } }, - "dev" : { - "mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", - "redis" : { - "url" : "redis://192.168.8.94:6379", - "offset" : { - "cache" : 0, - "session" : 1, - "ranking" : 2 - } + "dev": { + "mongo": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "redis": { + "cache": "redis://192.168.8.94:6380/4", + "session": "redis://192.168.8.94:6380/5", + "ranking": "redis://192.168.8.94:6380/6", + "wshandler": "redis://192.168.8.94:6380/7", + "tavern": "redis://192.168.8.94:6380/8" } } }, - "maingate_mongodb_url" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", - "maingate_service_url" : "http://localhost/maingate", - "maingate_api_token" : "63d08aa34f0162622c11284b", - - "tavern_service_url" : "http://localhost/tavern", - "tavern_group_types" : { - "subjugate" : { - "text_search_field" : ["name"], - "unique_index" : ["name,_id", "_id,members", "name,hidden"], - "search_index" : ["rules"], - "member_index" : ["_gid,candidate,luts","_gid,luts","_gid,expiring"], - "invite_ttl" : 30, - "candidate_ttl" : 3600, - "invitee_exlusive" : true, - "invitee_is_member" : true, - "max_member" : 4 + "maingate_mongodb_url": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "maingate_service_url": "http://localhost/maingate", + "maingate_api_token": "63d08aa34f0162622c11284b", + "tavern_service_url": "http://localhost/tavern", + "tavern_group_types": { + "subjugate": { + "text_search_field": [ + "name" + ], + "unique_index": [ + "name,_id", + "_id,members", + "name,hidden" + ], + "search_index": [ + "rules" + ], + "member_index": [ + "_gid,candidate,luts", + "_gid,luts", + "_gid,expiring" + ], + "invite_ttl": 30, + "candidate_ttl": 3600, + "invitee_exlusive": true, + "invitee_is_member": true, + "max_member": 4 }, - "lobby" : { - "max_member" : 3, - "invitee_exlusive" : true, - "invitee_is_member" : true, - "transient" : true, - "invite_ttl" : 30 + "lobby": { + "max_member": 3, + "invitee_exlusive": true, + "invitee_is_member": true, + "transient": true, + "invite_ttl": 30 } }, - - "ws_sync_pipeline" : "redis://192.168.8.94:6379/3", - - "services" : { - "kingdom" : { - "개발중" : { - "url" :"http://localhost/warehouse/dev", - "development" : true + + "services": { + "kingdom": { + "개발중": { + "url": "http://localhost/warehouse/dev", + "development": true }, - "개인서버" : { - "url" : "http://localhost/warehouse/private", - "development" : false + "개인서버": { + "url": "http://localhost/warehouse/private", + "development": false } } } diff --git a/core/apiimpl.go b/core/apiimpl.go index 260fab2..b784438 100644 --- a/core/apiimpl.go +++ b/core/apiimpl.go @@ -1,15 +1,11 @@ package core import ( - "context" - "encoding/json" - "io" "net/http" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" - "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -83,11 +79,11 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) { if candidate, ok := common.ReadBoolFormValue(r.Form, "candidate"); ok && candidate { err = group.Candidate(gidobj, midobj, doc) } else { - tidobj, err = group.Join(gidobj, midobj, tidobj, doc) + err = group.Join(gidobj, midobj, doc) } if err == nil { - json.NewEncoder(w).Encode(map[string]string{ + writeBsonDoc(w, map[string]string{ "gid": gidobj.Hex(), "tid": tidobj.Hex(), }) @@ -131,14 +127,14 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) { Invitee bson.M `bson:"invitee"` } if err := readBsonDoc(r.Body, &reqdoc); err != nil { - logger.Error("Invite failed. readBsonDoc returns err :", err) + logger.Println("Invite failed. readBsonDoc returns err :", err) w.WriteHeader(http.StatusInternalServerError) return } result, err := group.Invite(gid, mid, reqdoc.Inviter, reqdoc.Invitee) if err != nil { - logger.Error("Invite failed. group.Invite returns err :", err) + logger.Println("Invite failed. group.Invite returns err :", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -184,12 +180,6 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) { gid, _ := common.ReadObjectIDFormValue(r.Form, "gid") mid, _ := common.ReadObjectIDFormValue(r.Form, "mid") - tid, ok := common.ReadObjectIDFormValue(r.Form, "tid") - if !ok { - logger.Println("CancelInvitation failed. form value 'tid' is missing") - w.WriteHeader(http.StatusBadRequest) - return - } var member bson.M if err := readBsonDoc(r.Body, &member); err != nil { @@ -198,14 +188,12 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) { return } - gidbytes, err := group.AcceptInvitation(gid, mid, tid, member) + err := group.AcceptInvitation(gid, mid, member) if err != nil { - logger.Error("AcceptInvitation failed. group.AcceptInvitation returns err :", err) + logger.Println("AcceptInvitation failed. group.AcceptInvitation returns err :", err) w.WriteHeader(http.StatusInternalServerError) return } - - w.Write([]byte(gidbytes.Hex())) } func (sub *subTavern) DenyInvitation(w http.ResponseWriter, r *http.Request) { @@ -270,49 +258,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) { } } -func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("QueryOnlineGroup failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - var cmd *redis.StringSliceCmd - scoreStart, _ := common.ReadStringFormValue(r.Form, "score_start") - scoreStop, _ := common.ReadStringFormValue(r.Form, "score_stop") - - if len(scoreStart) > 0 || len(scoreStop) > 0 { - if len(scoreStart) == 0 { - scoreStart = "-inf" - } - if len(scoreStop) == 0 { - scoreStop = "+inf" - } - cmd = sub.wsh.RedisSync.ZRangeArgs(context.Background(), redis.ZRangeArgs{ - Key: onlineGroupQueryKey(typename), - ByScore: true, - Start: scoreStart, - Stop: scoreStop, - Rev: true, - Count: 1, - }) - } else { - // 아무거나 - cmd = sub.wsh.RedisSync.ZRandMember(context.Background(), onlineGroupQueryKey(typename), 1, false) - } - - result, err := cmd.Result() - if err != nil { - logger.Error("QueryOnlineGroup failed. redid.ZRandMember returns err :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - writeBsonDoc(w, bson.M{"r": result}) -} - func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) { typename, _ := common.ReadStringFormValue(r.Form, "type") group := sub.groups[typename] @@ -343,7 +288,7 @@ func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) { } if err := writeBsonArr(w, result); err != nil { - logger.Error("json marshal failed :", err) + logger.Error("bson marshal failed :", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -420,96 +365,7 @@ func (sub *subTavern) QueryGroup(w http.ResponseWriter, r *http.Request) { } if err := writeBsonDoc(w, result); err != nil { - logger.Error("json marshal failed :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } -} - -// QueryGroupMembers : 그룹내 멤버 조회 -// - type : 그룹 타입 -// - 그룹 타입에 맞는 키(주로 _id) -// - projection : select할 필드. ,로 구분 -func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("QueryGroupMembers failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("QueryGroupMembers failed. _id is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, _ := common.ReadObjectIDFormValue(r.Form, "mid") - var after primitive.Timestamp - if ts, ok := common.ReadStringFormValue(r.Form, "after"); ok && ts != "0.0" { - after = common.DotStringToTimestamp(ts) - } - projection, _ := common.ReadStringFormValue(r.Form, "projection") - - result, err := group.QueryMembers(gidobj, midobj, projection, after) - if err != nil { - logger.Error("QueryGroupMembers failed. FindAll err :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if result == nil { - return - } - - if err := writeBsonDoc(w, result); err != nil { - logger.Error("QueryGroupMembers failed. writeBsonArr err :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } -} - -func (sub *subTavern) QueryGroupMember(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("QueryGroupMember failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("QueryGroupMember failed. gid is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - mid, midok := common.ReadObjectIDFormValue(r.Form, "mid") - tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid") - if !midok && !tidok { - // 둘 중 하나는 있어야지 - logger.Println("QueryGroupMember failed. tid and mid are both missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - projection, _ := common.ReadStringFormValue(r.Form, "projection") - result, err := group.QueryMember(gid, mid, tid, projection) - if err != nil { - logger.Println("QueryGroupMember failed. group.QueryMember returns err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } - - if result == nil { - return - } - - if err := writeBsonDoc(w, result); err != nil { - logger.Error("QueryGroupMember failed. writeBsonDoc err :", err) + logger.Error("bson marshal failed :", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -535,15 +391,14 @@ func (sub *subTavern) LeaveGroup(w http.ResponseWriter, r *http.Request) { return } mid, midok := common.ReadObjectIDFormValue(r.Form, "mid") - tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid") - if !midok && !tidok { - // 둘 중 하나는 있어야지 - logger.Println("LeaveGroup failed. tid and mid are both missing") + + if !midok { + logger.Println("LeaveGroup failed. mid is missing") w.WriteHeader(http.StatusBadRequest) return } - if err := group.Leave(gid, mid, tid); err != nil { + if err := group.Leave(gid, mid); err != nil { // 둘 중 하나는 있어야지 logger.Println("LeaveGroup failed. group.Leave returns err :", err) w.WriteHeader(http.StatusBadRequest) @@ -626,125 +481,46 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request return } - body, err := io.ReadAll(r.Body) - if err != nil { + var frag bson.M + if err := readBsonDoc(r.Body, &frag); err != nil { logger.Error("UpdateGroupDocument failed. readBsonDoc err :", err) w.WriteHeader(http.StatusBadRequest) return } - if err := group.UpdateGroupDocument(gid, body); err != nil { + if err := group.UpdateGroupDocument(gid, frag); err != nil { logger.Error("UpdateGroupDocument failed. group.UpdateGroupDocument returns err :", err) w.WriteHeader(http.StatusBadRequest) return } } -func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) { +func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) { typename, _ := common.ReadStringFormValue(r.Form, "type") group := sub.groups[typename] if group == nil { - logger.Println("DismissGroup failed. type is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("UpdateMemberDocument failed. member_id is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("UpdateMemberDocument failed. _id is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - group.PauseMember(gidobj, midobj) -} - -func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("DropDeadMember failed. type is missing") + logger.Println("QueryGroupMembers failed. type is missing") w.WriteHeader(http.StatusBadRequest) return } gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") if !ok { - logger.Println("DropDeadMember failed. gid is missing") + logger.Println("QueryGroupMembers failed. gid is missing") w.WriteHeader(http.StatusBadRequest) return } - mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("DropDeadMember failed. mid is missing") + members, err := group.QueryGroupMembers(gid) + if err != nil { + logger.Error("QueryGroupMembers failed. group.QueryGroupMembers returns err :", err) w.WriteHeader(http.StatusBadRequest) return } - if err := group.DropPausedMember(gid, mid); err != nil { - logger.Error("DropDeadMember failed. group.DropDeadMember returns err :", err) - w.WriteHeader(http.StatusBadRequest) + if err := writeBsonDoc(w, members); err != nil { + logger.Error("QueryGroupMembers failed. writeBsonDoc return err :", err) + w.WriteHeader(http.StatusInternalServerError) return } } - -// func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) { -// defer func() { -// r := recover() -// if r != nil { -// logger.Error(r) -// } -// }() - -// redisSync := sub.wsh.RedisSync -// for msg := range deliveryChan { -// mid := msg.Alias -// if msg.Body != nil { -// buffer := msg.Body - -// var channame string -// for i, ch := range buffer { -// if ch == 0 { -// channame = string(buffer[:i]) -// buffer = buffer[i+1:] -// break -// } -// } - -// if len(channame) == 0 { -// continue -// } - -// buffer = append(mid[:], buffer...) -// _, err := redisSync.Publish(context.Background(), channame, buffer).Result() -// if err != nil { -// logger.Error(err) -// } -// } - -// if len(msg.Command) > 0 { -// switch msg.Command { -// case "pause": -// gidtype := msg.Conn.GetTag("gid") -// if len(gidtype) > 0 { -// tokens := strings.SplitN(gidtype, "@", 2) -// gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) -// gtype := tokens[1] -// group := sub.groups[gtype] -// if group != nil { -// group.PauseMember(gidobj, msg.Alias, msg.Conn) -// } -// } -// } -// } -// } -// logger.Println("delivery chan fin") -// } diff --git a/core/config.json b/core/config.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/core/config.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/core/group.go b/core/group.go index dc1cf2e..f5c3119 100644 --- a/core/group.go +++ b/core/group.go @@ -25,22 +25,18 @@ type groupConfig struct { type group interface { Create(form url.Values, doc bson.M) (primitive.ObjectID, error) Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error - Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error) - FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID + Join(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error - AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) + AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, member bson.M) error DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) - QueryMembers(groupID primitive.ObjectID, requesterID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) - QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) - Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error - DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error - PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error + Leave(groupID primitive.ObjectID, memberID primitive.ObjectID) error UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error Dismiss(groupID primitive.ObjectID) error - UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error + UpdateGroupDocument(groupID primitive.ObjectID, doc bson.M) error + QueryGroupMembers(groupID primitive.ObjectID) (bson.M, error) } diff --git a/core/group_memory.go b/core/group_memory.go index b3fa700..dc60261 100644 --- a/core/group_memory.go +++ b/core/group_memory.go @@ -2,24 +2,18 @@ package core import ( "context" - "crypto/md5" "encoding/gob" - "encoding/hex" "encoding/json" "errors" "fmt" "net/url" - "os" - "strings" - "sync" "time" - "repositories.action2quare.com/ayo/gocommon/flagx" + "github.com/go-redis/redis/v8" + "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/rpc" "repositories.action2quare.com/ayo/gocommon/wshandler" - "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -28,440 +22,243 @@ type accountID = primitive.ObjectID type ticketID = primitive.ObjectID type groupID = primitive.ObjectID -type Invitation struct { - GroupID groupID `json:"gid"` - TicketID ticketID `json:"tid"` - Inviter bson.M `json:"inviter"` - ExpireAtUTC int64 `json:"expire_at_utc"` +func init() { + gob.Register(memberDoc{}) + gob.Register(groupDoc{}) + gob.Register(Invitation{}) + gob.Register(InvitationFail{}) } -type memberDocCommon struct { - Body bson.M - Invite bool - InviteExpire time.Time - JoinTime int64 +func makeTid(gid groupID, in accountID) string { + var out primitive.ObjectID + for i := range in { + out[12-i-1] = gid[i] ^ in[12-i-1] + } + return out.Hex() +} + +type Invitation struct { + GroupID groupID `json:"_gid"` + TicketID string `json:"_tid"` + Inviter bson.M `json:"_inviter"` // memberDoc.Body + ExpireAtUTC int64 `json:"_expire_at_utc"` } // 플레이어한테 공유하는 멤버 정보 -type PublicMemberDoc struct { - memberDocCommon `json:",inline"` - Tid ticketID +type memberDoc struct { + Body bson.M `json:"_body"` + Invite bool `json:"_invite"` + InviteExpire int64 `json:"_invite_exp"` } -type FullGroupDoc struct { - Gid groupID - AllMembers []*PublicMemberDoc `json:",omitempty"` - Body GroupDocBody `json:",omitempty"` -} - -type GroupDocBody bson.M type InvitationFail bson.M -type memberDoc struct { - memberDocCommon `json:",inline"` - - // underscore keys in Hidden - Hidden bson.M - rconn *connection - Mid accountID -} - type groupDoc struct { - sync.Mutex + Members map[string]any `json:"_members"` + InCharge string `json:"_incharge"` + Gid string `json:"_gid"` - Body GroupDocBody - - InCharge accountID - tickets map[ticketID]*memberDoc - createTime time.Time + rh *gocommon.RedisonHandler + id groupID } -func init() { - gob.Register(PublicMemberDoc{}) -} - -func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) { - gd.Lock() - defer gd.Unlock() - - err := bson.Unmarshal(src, &gd.Body) +func (gd *groupDoc) loadMemberFull(tid string) (bson.M, error) { + full, err := gd.rh.JSONGet(gd.strid(), "$._members."+tid) if err != nil { return nil, err } - return bson.Marshal(gd.Body) + bt := []byte(full.(string)) + bt = bt[1 : len(bt)-1] + + var doc bson.M + if err = json.Unmarshal(bt, &doc); err != nil { + return nil, err + } + + return doc, nil } -func (gd *groupDoc) updateBodyWithJson(src []byte) GroupDocBody { - gd.Lock() - defer gd.Unlock() - - err := json.Unmarshal(src, &gd.Body) - if err != nil { - return nil - } - - return gd.Body -} - -func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) GroupDocBody { - gd.Lock() - defer gd.Unlock() - - err := bson.Unmarshal(bsonSrc, &gd.Body) - if err != nil { - return nil - } - - return gd.Body -} - -func (gd *groupDoc) updateBody(bsonSrc []byte) error { - gd.Lock() - defer gd.Unlock() - - return bson.Unmarshal(bsonSrc, &gd.Body) -} - -func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) { - gd.Lock() - defer gd.Unlock() - - mid := inviteeDoc["_mid"].(accountID) - body := inviteeDoc["body"].(bson.M) - - // 초대 가능한 빈 자리가 있나 - now := time.Now().UTC() - if len(gd.tickets) < max { - tid := primitive.NewObjectID() - newdoc := &memberDoc{ - memberDocCommon: memberDocCommon{ - Body: body, - Invite: true, - InviteExpire: now.Add(ttl), - }, - rconn: rconn, - Mid: mid, - } - gd.tickets[tid] = newdoc - return tid, newdoc - } - - for oldtid, mem := range gd.tickets { - if !mem.Invite { - continue - } - if mem.InviteExpire.Before(now) { - delete(gd.tickets, oldtid) - tid := primitive.NewObjectID() - newdoc := &memberDoc{ - memberDocCommon: memberDocCommon{ - Body: body, - Invite: true, - InviteExpire: now.Add(ttl), - }, - rconn: rconn, - Mid: mid, - } - gd.tickets[tid] = newdoc - return tid, newdoc - } - } - - return primitive.NilObjectID, nil -} - -func seperateHidden(in bson.M) (public bson.M, hidden bson.M) { - for k, v := range in { - if k[0] == '_' { - if hidden == nil { - hidden = make(bson.M) - } - hidden[k] = v - } - } - - for k := range hidden { - delete(in, k) - } - return in, hidden -} - -func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) { - gd.Lock() - defer gd.Unlock() - - if !gd.InCharge.IsZero() { - return primitive.NilObjectID, nil - } - - gd.InCharge = mid - newtid := primitive.NewObjectID() - doc, hidden := seperateHidden(doc) - newdoc := &memberDoc{ - memberDocCommon: memberDocCommon{ - Body: doc, - Invite: false, - JoinTime: time.Now().UTC().Unix(), - }, - rconn: rconn, - Mid: mid, - Hidden: hidden, - } - - gd.tickets[newtid] = newdoc - if gd.Body == nil { - gd.Body = GroupDocBody(make(bson.M)) - } - gd.Body["incharge"] = newtid.Hex() - return newtid, newdoc -} - -func (gd *groupDoc) addMember(mid accountID, tid *ticketID, doc bson.M) (*memberDoc, bool) { - gd.Lock() - defer gd.Unlock() - - var memdoc *memberDoc - isNew := false - if tid.IsZero() { - for oldtid, d := range gd.tickets { - if d.Mid == mid { - memdoc = d - *tid = oldtid - isNew = true - break - } +func (gd *groupDoc) loadFull() (doc bson.M) { + // 새 멤버에 그룹 전체를 알림 + full, err := gd.rh.JSONGet(gd.strid(), "$") + if err == nil { + bt := []byte(full.(string)) + bt = bt[1 : len(bt)-1] + err = json.Unmarshal(bt, &doc) + if err != nil { + logger.Println("loadFull err :", err) } } else { - var ok bool - memdoc, ok = gd.tickets[*tid] - if !ok { - // 티켓이 업네? - return nil, false - } - - if memdoc.Mid != mid { - // 내 티켓이 아니네? - return nil, false - } + logger.Println("loadFull err :", err) } - - doc, hidden := seperateHidden(doc) - if memdoc != nil { - memdoc.Body = doc - memdoc.Hidden = hidden - - if memdoc.Invite { - isNew = true - memdoc.Invite = false - } - - if memdoc.JoinTime == 0 { - memdoc.JoinTime = time.Now().UTC().Unix() - } - } - return memdoc, isNew + return } -func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) { - gd.Lock() - defer gd.Unlock() +func (gd *groupDoc) strid() string { + if len(gd.Gid) == 0 { + gd.Gid = gd.id.Hex() + } + return gd.Gid +} - if tid.IsZero() { - for t, mem := range gd.tickets { - if mem.Mid == mid { - *tid = t - delete(gd.tickets, t) - return - } +func (gd *groupDoc) tid(in accountID) string { + return makeTid(gd.id, in) +} + +func (gd *groupDoc) mid(tid string) accountID { + tidobj, _ := primitive.ObjectIDFromHex(tid) + var out primitive.ObjectID + for i := range tidobj { + out[12-i-1] = gd.id[i] ^ tidobj[12-i-1] + } + return out +} + +func (gd *groupDoc) addInvite(inviteeDoc bson.M, ttl time.Duration, max int) (*memberDoc, error) { + targetmid := inviteeDoc["_mid"].(accountID) + targetbody := inviteeDoc["body"].(bson.M) + + // 초대 가능한 빈 자리가 있나 + tids, err := gd.rh.JSONObjKeys(gd.strid(), "$._members") + if err != nil { + return nil, err + } + + now := time.Now().UTC() + createNewDoc := func() *memberDoc { + return &memberDoc{ + Body: targetbody, + Invite: true, + InviteExpire: now.Add(ttl).Unix(), } } - delete(gd.tickets, *tid) - - if gd.InCharge == mid { - gd.InCharge = primitive.NilObjectID - } -} - -func (gd *groupDoc) ticket(mid accountID) ticketID { - gd.Lock() - defer gd.Unlock() - - if mid.IsZero() { - return primitive.NilObjectID + newtid := gd.tid(targetmid) + if len(tids) < max { + // 빈자리를 찾았다. + newdoc := createNewDoc() + _, err := gd.rh.JSONSet(gd.strid(), "$._members."+newtid, newdoc) + return newdoc, err } - for t, m := range gd.tickets { - if m.Mid == mid { - return t - } + expires, err := gd.rh.JSONGetInt64(gd.strid(), "$._members.._invite_exp") + if err != nil { + return nil, err } - return primitive.NilObjectID -} -func (gd *groupDoc) member(tid ticketID) *memberDoc { - gd.Lock() - defer gd.Unlock() - - return gd.tickets[tid] -} - -func (gd *groupDoc) memberByAccount(mid accountID) (ticketID, *memberDoc) { - gd.Lock() - defer gd.Unlock() - - for tid, doc := range gd.tickets { - if doc.Mid == mid { - return tid, doc - } - } - return primitive.NilObjectID, nil -} - -func (gd *groupDoc) modifyMemberDocument(mid accountID, tid *ticketID, cb func(b *memberDoc)) *memberDoc { - gd.Lock() - defer gd.Unlock() - - if tid.IsZero() { - for t, mem := range gd.tickets { - if mem.Mid == mid { - *tid = t - break - } + var delpaths []string + for i, expire := range expires { + if expire < now.Unix() { + // 만료된 초대가 있네? 지우자 + delpaths = append(delpaths, "$._members."+tids[i]) } } - if tid.IsZero() { - return nil + if len(delpaths) == 0 { + // 빈자리가 없다 + return nil, nil } - if mem := gd.tickets[*tid]; mem != nil { - cb(mem) - return mem + if err := gd.rh.JSONMDel(gd.strid(), delpaths); err != nil { + return nil, err } - return nil + newdoc := createNewDoc() + _, err = gd.rh.JSONSet(gd.strid(), "$._members."+newtid, newdoc) + return newdoc, err } -func (gd *groupDoc) overwriteMemberDocument(mid accountID, tid *ticketID, raw []byte) *memberDoc { - gd.Lock() - defer gd.Unlock() +func (gd *groupDoc) addMember(mid accountID, doc bson.M) (bson.M, error) { + tid := gd.tid(mid) + prefix := "$._members." + tid - if tid.IsZero() { - for t, mem := range gd.tickets { - if mem.Mid == mid { - *tid = t - json.Unmarshal(raw, &mem.Body) - return mem - } - } + if _, err := gd.rh.JSONMerge(gd.strid(), prefix+"._body", doc, gocommon.RedisonSetOptionXX); err != nil { + return nil, err } - if mem := gd.tickets[*tid]; mem != nil { - var newbody primitive.M - json.Unmarshal(raw, &newbody) - mem.Body = newbody - return mem + if err := gd.rh.JSONMDel(gd.strid(), []string{prefix + "._invite", prefix + "._invite_exp"}); err != nil { + return nil, err } - return nil + return gd.loadMemberFull(tid) } -func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) { - gd.Lock() - defer gd.Unlock() - - for k, v := range gd.tickets { - cb(k, v) - } +func (gd *groupDoc) removeMember(mid accountID) error { + _, err := gd.rh.JSONDel(gd.strid(), "$._members."+gd.tid(mid)) + return err } -func (gd *groupDoc) serializeFull(gid groupID) FullGroupDoc { - gd.Lock() - defer gd.Unlock() - - var output []*PublicMemberDoc - for k, v := range gd.tickets { - if v.Invite { - // 아직 초대 중인 대상. 패스 - continue - } - - output = append(output, &PublicMemberDoc{ - memberDocCommon: v.memberDocCommon, - Tid: k, - }) +func (gd *groupDoc) getMembers() (map[string]any, error) { + res, err := gd.rh.JSONGet(gd.strid(), "$._members") + if err != nil { + return nil, err } - return FullGroupDoc{ - Gid: gid, - AllMembers: output, - Body: gd.Body, + var temp []map[string]any + err = json.Unmarshal([]byte(res.(string)), &temp) + if err != nil { + return nil, err } -} -type groupContainer struct { - sync.Mutex - groupDocs map[groupID]*groupDoc + out := make(map[string]any) + for k, v := range temp[0] { + body := v.(map[string]any)["_body"] + out[gd.mid(k).Hex()] = body + } + + return out, nil } type groupInMemory struct { *groupConfig - groupDocSync func(groupID, []byte) error - memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error - rpcCall func([]byte) error - hasConn func(accountID) *connection sendUpstreamMessage func(*wshandler.UpstreamMessage) sendEnterRoomMessage func(groupID, accountID) sendLeaveRoomMessage func(groupID, accountID) - groups groupContainer + rh *gocommon.RedisonHandler } -func (gc *groupContainer) add(id groupID, doc *groupDoc) { - gc.Lock() - defer gc.Unlock() +func (gm *groupInMemory) createGroup(newid groupID, charge accountID, chargeDoc bson.M) (*groupDoc, error) { + tid := makeTid(newid, charge) - gc.groupDocs[id] = doc -} + gd := &groupDoc{ + Members: map[string]any{ + tid: &memberDoc{ + Body: chargeDoc, + Invite: false, + InviteExpire: 0, + }, + }, + InCharge: tid, -func (gc *groupContainer) createWithID(newid groupID, base bson.M) (groupID, *groupDoc) { - gc.Lock() - defer gc.Unlock() - - if _, ok := gc.groupDocs[newid]; ok { - return primitive.NilObjectID, nil + rh: gm.rh, + id: newid, } - newdoc := newGroupDoc(base) - gc.groupDocs[newid] = newdoc - - return newid, newdoc -} - -func (gc *groupContainer) delete(gid groupID) { - gc.Lock() - defer gc.Unlock() - - delete(gc.groupDocs, gid) -} - -func (gc *groupContainer) find(id groupID) *groupDoc { - gc.Lock() - defer gc.Unlock() - - if found, ok := gc.groupDocs[id]; ok { - return found + _, err := gm.rh.JSONSet(gd.strid(), "$", gd, gocommon.RedisonSetOptionNX) + if err != nil { + return nil, err } - return nil + return gd, nil } -func newGroupDoc(base bson.M) *groupDoc { +func (gm *groupInMemory) find(id groupID) (*groupDoc, error) { + if id.IsZero() { + return nil, nil + } + + _, err := gm.rh.JSONObjLen(id.Hex(), "$") + if err == redis.Nil { + return nil, nil + } + if err != nil { + return nil, err + } + return &groupDoc{ - Body: GroupDocBody(base), - createTime: time.Now().UTC(), - tickets: make(map[ticketID]*memberDoc), - } + rh: gm.rh, + id: id, + }, nil } func (gm *groupInMemory) Create(form url.Values, base bson.M) (groupID, error) { @@ -474,114 +271,44 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error } var errGroupNotExist = errors.New("group does not exist") -var errNoEmptySlot = errors.New("no more seat in group") -func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) { - group := gm.groups.find(gid) - if group == nil { - // 그룹이 없다. 실패 - return primitive.NilObjectID, errGroupNotExist +func (gm *groupInMemory) Join(gid groupID, mid accountID, doc bson.M) error { + gd, err := gm.find(gid) + if err != nil { + return err } - // 내 정보 업데이트할 때에도 사용됨 - // 굳이 InCharge가 있는 호스트가 아니어도 가능 - if memdoc, isNew := group.addMember(mid, &tid, doc); memdoc != nil { - gm.memberSync(gid, mid, tid, memdoc, isNew) - } - - return tid, nil -} - -func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID { - return primitive.NilObjectID -} - -// func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { -// bt, _ := json.Marshal(makeTypeMessage(msg)) -// rconn.WriteBytes(bt) -// } - -// func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { -// bt, _ := json.Marshal(makeTypeMessage(msg)) -// gm.SendMessage(target, bt) -// } - -// func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { -// rconn := gm.hasConn(target) -// if rconn != nil { -// rconn.WriteBytes(msg) -// } else { -// gm.rpc(target).call(target, msg) -// } -// } - -// func multicast(conns []*wshandler.Richconn, raw []byte) { -// for _, rconn := range conns { -// rconn.WriteBytes(raw) -// } -// } - -// func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { -// if gd := gm.groups.find(gid); gd != nil { -// bt, _ := json.Marshal(makeTypeMessage(msg)) -// go multicast(gd.conns(false), bt) -// } -// } - -var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") - -func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error { - targetid := inviteeDoc["_mid"].(accountID) - - // invitee에게 알림 - // invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자 - rconn := gm.hasConn(targetid) - if rconn == nil { - return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) - } - - gd := gm.groups.find(gid) if gd == nil { + // 그룹이 없다. 실패 return errGroupNotExist } - if rconn.hasOnCloseFunc("member_remove_invite") { - // 이미 초대 중이다. - // inviter한테 알려줘야 한다. + // 내 정보 업데이트할 때에도 사용됨 + if memdoc, err := gd.addMember(mid, doc); err == nil { + // 기존 유저에게 새 유저 알림 + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: map[string]any{ + gd.tid(mid): memdoc, + }, + Tag: []string{"MemberDocFull"}, + }) + + gm.sendEnterRoomMessage(gid, mid) + + // 새 멤버에 그룹 전체를 알림 gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), - Body: inviteeDoc, - Tag: []string{"InvitationFail"}, + Body: gd.loadFull(), + Tag: []string{"GroupDocFull"}, }) - return nil } - tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember) - if newdoc == nil { - return errNoEmptySlot - } - - rconn.registOnCloseFunc("member_remove_invite", func() { - gd.removeMember(targetid, &tid) - gm.memberSync(gid, targetid, tid, nil, false) - }) - - gm.memberSync(gid, targetid, tid, newdoc, false) - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "@" + targetid.Hex(), - Body: Invitation{ - GroupID: gid, - TicketID: tid, - Inviter: inviterDoc, - ExpireAtUTC: newdoc.InviteExpire.Unix(), - }, - Tag: []string{"Invitation"}, - }) - - return nil + return err } -var errAlreayMember = errors.New("this target is already member") +var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") +var errAlreadyInvited = errors.New("this target is already invited by someone or me") func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { targetid, ok := inviteeDoc["_mid"].(accountID) @@ -589,111 +316,132 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i return "", errInviteeDocMidMissing } - if !gid.IsZero() { - if gd := gm.groups.find(gid); gd != nil { - if gd.InCharge != mid { - // 이러면 안된다. - // 초대는 InCharge만 할 수 있음 - return "", nil - } + // targetid에 초대한 mid가 들어있다. + already, err := gm.rh.Get(context.Background(), targetid.Hex()).Result() + if err != nil && err != redis.Nil { + return "", err + } + + if len(already) > 0 { + if already != mid.Hex() { + // 이미 초대 중이다. + // inviter한테 알려줘야 한다. + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: inviteeDoc, + Tag: []string{"InvitationFail"}, + }) } + return "", errAlreadyInvited } - // gid는 미리 만들어 놔야함. - // 초대하는 클라이언트가 아직 group을 소유하지 않고 있을 수 있다. - // mid의 rconn이 이 호스트에 없더라도 gid는 이 request를 보낸 클라이언트가 받아야 하기 떄문 - if gid.IsZero() { - gid = primitive.NewObjectID() + gd, err := gm.find(gid) + if err != nil { + return "", err } - rconn := gm.hasConn(mid) - if rconn == nil { - // mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다. - // 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다. - return gid.Hex(), rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) - } - - // 이제 여기는 mid가 InCharge이면서 rconn이 존재 - gd := gm.groups.find(gid) if gd == nil { - _, gd = gm.groups.createWithID(gid, bson.M{}) - tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc) - rconn.registOnCloseFunc("member_remove", func() { - // 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다. - gm.groupDocSync(gid, nil) - }) - bt, err := bson.Marshal(gd.Body) + gd, err = gm.createGroup(gid, mid, inviterDoc) if err != nil { return "", err } - gm.groupDocSync(gid, bt) - gm.memberSync(gid, mid, tid, newdoc, true) // 내가 wshandler room에 입장 gm.sendEnterRoomMessage(gid, mid) - } else { - // targetid가 이미 멤버인지 미리 확인 가능 - if !gd.ticket(targetid).IsZero() { - // 이미 멤버네 - return "", errAlreayMember - } + + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: gd, + Tag: []string{"GroupDocFull"}, + }) } - return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc) + newdoc, err := gd.addInvite(inviteeDoc, time.Duration(gm.InviteExpire+1)*time.Second, gm.MaxMember) + if err != nil { + return "", err + } + + // 초대 중 표시 + _, err = gm.rh.SetNX(context.Background(), targetid.Hex(), mid.Hex(), time.Duration(gm.InviteExpire)*time.Second).Result() + if err != nil { + return "", err + } + + // invitee에게 알림 + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + targetid.Hex(), + Body: Invitation{ + GroupID: gid, + TicketID: gd.tid(targetid), + Inviter: inviterDoc, + ExpireAtUTC: newdoc.InviteExpire, + }, + Tag: []string{"Invitation"}, + }) + + return gd.strid(), nil } func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error { return nil } -func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticketID, member bson.M) (groupID, error) { - gd := gm.groups.find(gid) - if gd == nil { - return primitive.NilObjectID, errGroupNotExist +var errInvitationExpired = errors.New("invitation is already expired") + +func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, member bson.M) error { + cnt, err := gm.rh.Del(context.Background(), mid.Hex()).Result() + if err != nil { + return err + } + if cnt == 0 { + // 만료됨 + return errInvitationExpired } - rconn := gm.hasConn(mid) - if rconn == nil { - return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member) + gd := &groupDoc{ + id: gid, + rh: gm.rh, } - oldFunc := rconn.unregistOnCloseFunc("member_remove") - if oldFunc != nil { - // 기존 멤버였으면 탈퇴 처리 - oldFunc() - } + memberDoc, err := gd.addMember(mid, member) + if err == nil { + // 기존 멤버에게 새 멤버를 알림 + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: map[string]any{ + gd.tid(mid): memberDoc, + }, + Tag: []string{"MemberDocFull"}, + }) - inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") - rconn.registOnCloseFunc("member_remove", inviteFunc) - - result, isNew := gd.addMember(mid, &tid, member) - if result != nil { gm.sendEnterRoomMessage(gid, mid) - return gid, gm.memberSync(gid, mid, tid, result, isNew) + + // 새 멤버에 그룹 전체를 알림 + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: gd.loadFull(), + Tag: []string{"GroupDocFull"}, + }) } // 실패 - return primitive.NilObjectID, nil + return err +} + +func (gm *groupInMemory) QueryGroupMembers(gid groupID) (bson.M, error) { + gd := groupDoc{ + id: gid, + rh: gm.rh, + } + return gd.getMembers() } func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID) error { - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist + gm.rh.Del(context.Background(), mid.Hex()).Result() + gd := groupDoc{ + id: gid, + rh: gm.rh, } - - rconn := gm.hasConn(mid) - if rconn == nil { - return rpc.Make(gm).To(mid).Call(gid, mid, tid) - } - - inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") - if inviteFunc != nil { - inviteFunc() // removeMember는 여기에 들어있다. - return nil - } - - gd.removeMember(mid, &tid) - return gm.memberSync(gid, mid, tid, nil, false) + return gd.removeMember(mid) } func (gm *groupInMemory) QueryInvitations(mid accountID, after primitive.Timestamp) ([]bson.M, error) { @@ -708,168 +456,44 @@ func (gm *groupInMemory) FindAll(filter bson.M, projection string, after primiti func (gm *groupInMemory) FindOne(gid groupID, projection string) (bson.M, error) { return nil, nil } - -func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist +func (gm *groupInMemory) Leave(gid groupID, mid accountID) error { + gd := groupDoc{ + id: gid, + rh: gm.rh, + } + if err := gd.removeMember(mid); err != nil { + return err } - tid, memdoc := gd.memberByAccount(mid) - if memdoc == nil { - return errNotMember - } - - if _, ok := memdoc.Body["paused"]; ok { - // 드랍해야 한다. - if gd.InCharge == mid { - // 내가 방장인 경우 - return gm.groupDocSync(gid, nil) - } else { - // 내가 방장이 아닌 경우 - gd.removeMember(mid, &tid) - return gm.memberSync(gid, mid, tid, nil, false) - } - } - - return nil -} - -func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error { - rconn := gm.hasConn(mid) - if rconn == nil { - return rpc.Make(gm).To(mid).Call(gid, mid) - } - - // 접속은 끊기지만 그룹에서 제거하지는 않는 상태 - rconn.unregistOnCloseFunc("member_remove") - rconn.unregistOnCloseFunc("member_remove_invite") - gm.sendLeaveRoomMessage(gid, mid) - - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist - } - - tid := primitive.NilObjectID - newdoc := gd.modifyMemberDocument(mid, &tid, func(memdoc *memberDoc) { - memdoc.Body["paused"] = true - memdoc.rconn = nil - }) - - return gm.memberSync(gid, mid, tid, newdoc, false) -} - -func (gm *groupInMemory) QueryMembers(gid groupID, reqID accountID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { - gd := gm.groups.find(gid) - if gd == nil { - return nil, errGroupNotExist - } - - if gd.InCharge != reqID { - return nil, errGroupNotExist - } - - outdocs := make(map[string]bson.M) - if len(projection) > 0 { - projkeys := map[string]bool{} - for _, p := range strings.Split(projection, ",") { - if p[0] == '+' { - projkeys[strings.TrimSpace(p[1:])] = true - } else { - projkeys[strings.TrimSpace(p)] = true - } - } - - gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { - outdoc := bson.M{} - for k := range projkeys { - if k[0] == '_' { - outdoc[k] = memdoc.Hidden[k] - } else { - outdoc[k] = memdoc.Body[k] - } - } - outdocs[memdoc.Mid.Hex()] = outdoc - }) - } else { - gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { - outdoc := bson.M{} - for k, v := range memdoc.Hidden { - outdoc[k] = v - } - for k, v := range memdoc.Body { - outdoc[k] = v - } - outdocs[memdoc.Mid.Hex()] = outdoc - }) - } - - return outdocs, nil -} - -func (gm *groupInMemory) QueryMember(gid groupID, mid accountID, tid ticketID, projection string) (bson.M, error) { - return nil, nil -} - -var errHaveNoAuthority = errors.New("cannot kick other member") -var errNotMember = errors.New("ticket is not in this group") - -func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error { - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist - } - - // mid가 InCharge인 경우에는 tid가 누구든 내쫓고, - // mid가 InCharge가 아닌 경우는 tid가 mid일 경우에만 나갈 수 있다. - memdoc := gd.member(tid) - if memdoc == nil { - return errNotMember - } - targetmid := memdoc.Mid - - // 내가 방장이면 아무나 내보낼 수 있다. - if gd.InCharge != mid && targetmid != mid { - // targetmid와 mid가 같아야 한다. 방장이 아니므로 나는 나만 내보낼 수 있다. - return errHaveNoAuthority - } - - // targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자 - rconn := gm.hasConn(targetmid) - if rconn == nil { - return rpc.Make(gm).To(targetmid).Call(gid, mid, tid) - } - - if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil { - oldfunc() // 이 안에 다 있다. - } - - // 나한테는 빈 FullGroupDoc을 보낸다. + // 나한테는 빈 GroupDocFull을 보낸다. 그러면 지워짐 gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), - Body: FullGroupDoc{Gid: gid}, - Tag: []string{"FullGroupDoc", gid.Hex()}, + Body: bson.M{"gid": gid}, + Tag: []string{"GroupDocFull", gid.Hex()}, }) - gm.sendLeaveRoomMessage(gid, targetmid) + gm.sendLeaveRoomMessage(gid, mid) return nil } func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error { - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist + gd := &groupDoc{ + id: gid, + rh: gm.rh, } - tid := gd.ticket(mid) - bt, _ := json.Marshal(doc) + prefixPath := fmt.Sprintf("$._members.%s.", gd.tid(mid)) + err := gm.rh.JSONMSetRel(gd.strid(), prefixPath, doc) + if err != nil { + return err + } - personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt))) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), - Body: gd.updateBodyWithJson(personalized), - Tag: []string{"GroupDocBody"}, + Body: map[string]any{ + gd.tid(mid): doc, + }, + Tag: []string{"MemberDocFragment"}, }) return nil @@ -878,89 +502,42 @@ func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bs func (gm *groupInMemory) Dismiss(gid groupID) error { return nil } -func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error { - gd := gm.groups.find(gid) - if gd == nil { - return errGroupNotExist + +func (gm *groupInMemory) UpdateGroupDocument(gid groupID, frag bson.M) error { + gd := groupDoc{ + id: gid, + rh: gm.rh, } - newbody, err := gd.updateBodyWithBson(body) - if err != nil { + if err := gm.rh.JSONMSetRel(gd.strid(), "$.", frag); err != nil { return err } - return gm.groupDocSync(gid, newbody) -} + // 업데이트 알림 + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: frag, + Tag: []string{"GroupDocFragment"}, + }) -func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool { - return gm.hasConn(target) != nil + return nil } -var devflag = flagx.Bool("dev", false, "") - func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) { // group document // member document region := sub.region wsh := sub.wsh - - groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename) - memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename) - rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename) - - toHashHex := func(name string) string { - hash := md5.New() - hash.Write([]byte(name)) - if *devflag { - hn, _ := os.Hostname() - hash.Write([]byte(hn)) - } - - return hex.EncodeToString(hash.Sum(nil)[:8]) + storage := config.RegionStorage[sub.region] + redisClient, err := gocommon.NewRedisClient(storage.Redis["tavern"]) + if err != nil { + return nil, err } - - groupDocSyncChanName = toHashHex(groupDocSyncChanName) - memberSyncChanName = toHashHex(memberSyncChanName) - rpcChanName = toHashHex(rpcChanName) - // 여기서는 subscribe channel // 각 함수에서는 publish gm := &groupInMemory{ groupConfig: cfg, - groupDocSync: func(gid groupID, newbody []byte) error { - bt := []byte(fmt.Sprintf("%s%s", config.macAddr, gid.Hex())) - bt = append(bt, newbody...) - _, err := wsh.RedisSync.Publish(ctx, groupDocSyncChanName, bt).Result() - return err - }, - memberSync: func(gid groupID, mid accountID, tid ticketID, doc *memberDoc, newmember bool) error { - var payload string - if doc != nil { - bt, _ := json.Marshal(doc) - newmemberflag := func() string { - if newmember { - return "t" - } else { - return "f" - } - }() - payload = fmt.Sprintf("%s%s%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex(), newmemberflag, string(bt)) - } else { - payload = fmt.Sprintf("%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex()) - } - _, err := wsh.RedisSync.Publish(ctx, memberSyncChanName, payload).Result() - return err - }, - rpcCall: func(bt []byte) error { - _, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result() - return err - }, - hasConn: func(t accountID) *connection { - return sub.cm.get(t) - }, - groups: groupContainer{ - groupDocs: make(map[groupID]*groupDoc), - }, + rh: gocommon.NewRedisonHandler(ctx, redisClient), sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) { wsh.SendUpstreamMessage(region, msg) }, @@ -972,175 +549,5 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, su }, } - rpc.RegistReceiver(gm) - - processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() - - for msg := range pubsub.Channel() { - if msg == nil { - pubsub = nil - break - } - - switch msg.Channel { - case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널 - payload := []byte(msg.Payload) - if len(payload) < len(config.macAddr) { - break - } - - senderHost, remain := payload[:len(config.macAddr)], payload[len(config.macAddr):] - if len(remain) < 24 { - break - } - - idstr, remain := remain[:24], remain[24:] - gid, _ := primitive.ObjectIDFromHex(string(idstr)) - gd := gm.groups.find(gid) - if gd != nil { - if len(remain) == 0 { - // gid 그룹 삭제 - // 그룹 안에 있는 멤버에게 알림 - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "#" + gid.Hex(), - Body: FullGroupDoc{Gid: gid}, - Tag: []string{"FullGroupDoc"}, - }) - gm.groups.delete(gid) - } else if string(senderHost) != config.macAddr { - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "#" + gid.Hex(), - Body: gd.updateBodyBsonToJson(remain), - Tag: []string{"GroupDocBody"}, - }) - } - } else if string(senderHost) != config.macAddr { - var newDoc groupDoc - if err := newDoc.updateBody(remain); err != nil { - logger.Error("groupDocSyncChanName message decode failed :", remain, err) - } else { - gm.groups.add(gid, &newDoc) - } - } - - case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널 - if len(msg.Payload) < len(config.macAddr) { - break - } - - senderHost, remain := msg.Payload[:len(config.macAddr)], msg.Payload[len(config.macAddr):] - if len(remain) < 24 { - break - } - - idstr, remain := remain[:24], remain[24:] - gid, _ := primitive.ObjectIDFromHex(idstr) - gd := gm.groups.find(gid) - if gd == nil { - // 미리 그룹을 없애고 싱크 메시지를 보낸후 받은 것일 수 있다. - break - } - - idstr, remain = remain[:24], remain[24:] - mid, _ := primitive.ObjectIDFromHex(idstr) - idstr, remain = remain[:24], remain[24:] - tid, _ := primitive.ObjectIDFromHex(idstr) - - isNewMember := false - if len(remain) > 0 { - idstr, remain = remain[:1], remain[1:] - isNewMember = idstr == "t" - } - - var updated *memberDoc - rconn := gm.hasConn(mid) - - if senderHost != config.macAddr { - // 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅 - if len(remain) == 0 { - // mid 삭제 - gd.removeMember(mid, &tid) - updated = nil - } else { - updated = gd.overwriteMemberDocument(mid, &tid, []byte(remain)) - } - } else { - updated = gd.member(tid) - } - - if updated == nil { - // 멤버 삭제 알림 - if rconn != nil { - // gid에 이미 다른 값이 있을 수 있다. - // 정확하게 이 값이면 제거하고, 아니면 넘어간다. - rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) - } - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "#" + gid.Hex(), - Body: PublicMemberDoc{Tid: tid}, - Tag: []string{"PublicMemberDoc"}, - }) - } else { - if isNewMember && updated.rconn == nil && rconn != nil { - updated.rconn = rconn - } - // 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외 - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "#" + gid.Hex(), - Body: PublicMemberDoc{ - Tid: tid, - memberDocCommon: updated.memberDocCommon, - }, - Tag: []string{"PublicMemberDoc"}, - }) - } - - if isNewMember { - if rconn != nil { - // 새 멤버이므로 기존 멤버를 다 보내준다. - rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) - gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ - Target: "@" + mid.Hex(), - Body: gd.serializeFull(gid), - Tag: []string{"FullGroupDoc"}, - }) - } - } - - default: - logger.Println("unknown channel") - } - } - return pubsub - } - - go func() { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() - - var pubsub *redis.PubSub - for { - if pubsub == nil { - pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName) - } - - if pubsub == nil { - time.Sleep(time.Second) - continue - } - pubsub = processChannelMessage(gm, pubsub) - } - }() - return gm, nil } diff --git a/core/richconn.go b/core/richconn.go deleted file mode 100644 index 218b5e5..0000000 --- a/core/richconn.go +++ /dev/null @@ -1,97 +0,0 @@ -package core - -import ( - "fmt" - "strings" - "sync" -) - -type connection struct { - locker sync.Mutex - alias string - tags []string - onClose map[string]func() -} - -func (rc *connection) addTag(name, val string) { - rc.locker.Lock() - defer rc.locker.Unlock() - - prefix := name + "=" - for i, tag := range rc.tags { - if strings.HasPrefix(tag, prefix) { - rc.tags[i] = prefix + val - return - } - } - rc.tags = append(rc.tags, prefix+val) -} - -func (rc *connection) removeTag(name string, val string) { - rc.locker.Lock() - defer rc.locker.Unlock() - - whole := fmt.Sprintf("%s=%s", name, val) - for i, tag := range rc.tags { - if tag == whole { - if i == 0 && len(rc.tags) == 1 { - rc.tags = nil - } else { - lastidx := len(rc.tags) - 1 - if i < lastidx { - rc.tags[i] = rc.tags[lastidx] - } - rc.tags = rc.tags[:lastidx] - } - return - } - } -} - -func (rc *connection) registOnCloseFunc(name string, f func()) { - rc.locker.Lock() - defer rc.locker.Unlock() - - if rc.onClose == nil { - f() - return - } - rc.onClose[name] = f -} - -func (rc *connection) hasOnCloseFunc(name string) bool { - rc.locker.Lock() - defer rc.locker.Unlock() - - if rc.onClose == nil { - return false - } - - _, ok := rc.onClose[name] - return ok -} - -func (rc *connection) unregistOnCloseFunc(name string) (out func()) { - rc.locker.Lock() - defer rc.locker.Unlock() - - if rc.onClose == nil { - return - } - out = rc.onClose[name] - delete(rc.onClose, name) - return -} - -func (rc *connection) cleanup() { - rc.locker.Lock() - defer rc.locker.Unlock() - - cp := rc.onClose - rc.onClose = nil - go func() { - for _, f := range cp { - f() - } - }() -} diff --git a/core/tavern.go b/core/tavern.go index d415722..e8ebf2b 100644 --- a/core/tavern.go +++ b/core/tavern.go @@ -9,8 +9,9 @@ import ( "net/http" "reflect" "strings" - "sync" + "time" + "github.com/go-redis/redis/v8" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" @@ -30,10 +31,6 @@ func writeBsonArr(w io.Writer, src []bson.M) error { }) } -func onlineGroupQueryKey(prefix string) string { - return prefix + "_olg" -} - func writeBsonDoc[T any](w io.Writer, src T) error { rw, err := bsonrw.NewBSONValueWriter(w) if err != nil { @@ -82,43 +79,6 @@ type TavernConfig struct { var config TavernConfig -type connectionMap struct { - sync.Mutex - conns map[primitive.ObjectID]*connection -} - -func (cm *connectionMap) add(accid accountID, alias string) { - cm.Lock() - defer cm.Unlock() - - old := cm.conns[accid] - if old != nil { - old.cleanup() - } - cm.conns[accid] = &connection{ - alias: alias, - onClose: make(map[string]func()), - } -} - -func (cm *connectionMap) remove(accid accountID) { - cm.Lock() - defer cm.Unlock() - - old := cm.conns[accid] - if old != nil { - delete(cm.conns, accid) - old.cleanup() - } -} - -func (cm *connectionMap) get(accid accountID) *connection { - cm.Lock() - defer cm.Unlock() - - return cm.conns[accid] -} - type Tavern struct { subTaverns []*subTavern wsh *wshandler.WebsocketHandler @@ -126,11 +86,11 @@ type Tavern struct { type subTavern struct { mongoClient gocommon.MongoClient + redisClient *redis.Client wsh *wshandler.WebsocketHandler region string groups map[string]group methods map[string]reflect.Method - cm connectionMap } func getMacAddr() (string, error) { @@ -183,25 +143,8 @@ func (tv *Tavern) Cleanup() { } } -// func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) { -// switch messageType { -// case wshandler.Connected: - -// case wshandler.Disconnected: -// } -// // gidtype := msg.Conn.GetTag("gid") -// // if len(gidtype) > 0 { -// // tokens := strings.SplitN(gidtype, "@", 2) -// // gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) -// // gtype := tokens[1] -// // group := sub.groups[gtype] -// // if group != nil { -// // group.PauseMember(gidobj, msg.Alias, msg.Conn) -// // } -// } - func (tv *Tavern) prepare(ctx context.Context) error { - for region := range config.RegionStorage { + for region, addr := range config.RegionStorage { var dbconn gocommon.MongoClient var err error var groupinstance group @@ -214,14 +157,17 @@ func (tv *Tavern) prepare(ctx context.Context) error { methods[method.Name] = method } + redisClient, err := gocommon.NewRedisClient(addr.Redis["tavern"]) + if err != nil { + return err + } + sub := &subTavern{ wsh: tv.wsh, mongoClient: dbconn, + redisClient: redisClient, region: region, methods: methods, - cm: connectionMap{ - conns: make(map[primitive.ObjectID]*connection), - }, } groups := make(map[string]group) @@ -254,7 +200,7 @@ func (tv *Tavern) prepare(ctx context.Context) error { func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { for _, sub := range tv.subTaverns { - tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived) + tv.wsh.RegisterReceiver(sub.region, sub) var pattern string if sub.region == "default" { pattern = gocommon.MakeHttpHandlerPattern(prefix, "api") @@ -267,11 +213,11 @@ func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, return nil } -func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) { +func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) { if messageType == wshandler.Connected { - sub.cm.add(sender.Accid, sender.Alias) + logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex()) } else if messageType == wshandler.Disconnected { - sub.cm.remove(sender.Accid) + logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex()) } else if messageType == wshandler.BinaryMessage { var msg map[string][]any dec := json.NewDecoder(body) @@ -291,12 +237,34 @@ func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageTyp if group := sub.groups[typename]; group != nil { group.UpdateMemberDocument(gidobj, sender.Accid, doc) } + + case "UpdateGroupDocument": + typename := args[0].(string) + gidobj, _ := primitive.ObjectIDFromHex(args[1].(string)) + doc := args[2].(map[string]any) + if group := sub.groups[typename]; group != nil { + group.UpdateGroupDocument(gidobj, doc) + } } } } } } +func (sub *subTavern) OnRoomCreated(region, name string) { + _, err := sub.redisClient.Persist(context.Background(), name).Result() + if err != nil { + logger.Println("OnRoomCreate Persist failed :", err) + } +} + +func (sub *subTavern) OnRoomDestroyed(region, name string) { + _, err := sub.redisClient.Expire(context.Background(), name, 3600*time.Second).Result() + if err != nil { + logger.Println("OnRoomDestroyed Persist failed :", err) + } +} + func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { defer func() { s := recover() diff --git a/core/tavern_test.go b/core/tavern_test.go new file mode 100644 index 0000000..663360c --- /dev/null +++ b/core/tavern_test.go @@ -0,0 +1,77 @@ +// warroom project main.go +package core + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +func TestPubSub(t *testing.T) { + opt0, _ := redis.ParseURL("redis://192.168.8.94:6380/0") + opt1, _ := redis.ParseURL("redis://192.168.8.94:6380/1") + + rc0 := redis.NewClient(opt0) + rc1 := redis.NewClient(opt1) + + go func() { + time.Sleep(time.Second) + rc1.Publish(context.Background(), "__testchan", "real???") + fmt.Println("published") + }() + + pubsub := rc0.Subscribe(context.Background(), "__testchan") + msg, err := pubsub.ReceiveMessage(context.Background()) + fmt.Println(msg.Payload, err) +} + +func TestReJSON(t *testing.T) { + rc := redis.NewClient(&redis.Options{Addr: "192.168.8.94:6380"}) + rh := gocommon.NewRedisonHandler(context.Background(), rc) + + testDoc := map[string]any{ + "members": map[string]any{ + "mid2": map[string]any{ + "key": "val", + "exp": 20202020, + }, + "mid1": map[string]any{ + "key": "val", + "exp": 10101010, + }, + }, + } + + gd := groupDoc{ + id: primitive.NewObjectID(), + } + + midin := primitive.NewObjectID() + tid := gd.tid(midin) + midout := gd.mid(tid) + logger.Println(midin, tid, midout) + + logger.Println(rh.JSONSet("jsontest", "$", testDoc)) + logger.Println(rh.JSONGet("jsontest", "$")) + logger.Println(rh.JSONResp("jsontest", "$.members")) + logger.Println(rh.JSONGetString("jsontest", "$.members..key")) + logger.Println(rh.JSONGetInt64("jsontest", "$.members..exp")) + logger.Println(rh.JSONObjKeys("jsontest", "$.members")) + + err := rh.JSONMSet("jsontest", map[string]any{ + "$.members.mid1.key": "newval", + "$.members.mid2.key": "newval", + }) + logger.Println(err) + + logger.Println(rh.JSONGet("jsontest", "$")) + logger.Println(rh.JSONMDel("jsontest", []string{"$.members.mid1", "$.members.mid2"})) + logger.Println(rh.JSONGet("jsontest", "$")) + logger.Println(rh.JSONObjLen("jsontest", "$.members")) +} diff --git a/go.mod b/go.mod index 5dbe3f9..7803b45 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,14 @@ go 1.20 require ( github.com/go-redis/redis/v8 v8.11.5 go.mongodb.org/mongo-driver v1.11.7 - repositories.action2quare.com/ayo/gocommon v0.0.0-20230713080645-269fa0f8700e + repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.5.4 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/klauspost/compress v1.16.6 // indirect github.com/montanaflynn/stats v0.7.1 // indirect @@ -24,4 +25,5 @@ require ( golang.org/x/crypto v0.10.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/text v0.10.0 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect ) diff --git a/go.sum b/go.sum index 388ec98..3b2f437 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,9 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -93,8 +94,9 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= @@ -102,23 +104,31 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee h1:Aau1j/b9wI4nyvrM7m1Q+2xkcW1Qo7i3q+QBD4Umnzg= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617 h1:91mBIGIyxzcnvOaIdegUuV+i9xs8YTSRcmyRaIytzx8= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97 h1:ARzXt3HBmiAUDyACfNm5Kvz1JMTn7+ryE03kB8x/km0= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94 h1:VrNj5gBFFN9/roWCxyBCZ2gu5k58eremNHQvQNPrfrU= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d h1:RdxKmMc7kHrTk+SvTYse2IGxmdDhbEDeM0fKAUW+G+w= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c h1:SktFqjnc/UOMjJrq/brSw5lQjW1IA+KkB5YgeovusmQ= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b h1:04rlgT+zeKSpekyleb8Mfi8kENIoka5DYJLuk65wqxc= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711084112-d48d4c0f2189 h1:4ugcv2AlTYjTEtw8ekjCfVzp+xNnNTOHpfWWRbugxvw= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230711084112-d48d4c0f2189/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230713064012-522bd4a597bc h1:ToHccG1AAGFoAVldbJxCv+yBh/GvNyCd74sBp1Hf7YY= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230713064012-522bd4a597bc/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230713080645-269fa0f8700e h1:94hUQRdZYbsYaTqm/cTI0pEBdp8zdfOSEfkxdO9kS9o= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230713080645-269fa0f8700e/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230715080833-f0f459332d1a h1:n2FF/GQYtCsi57Lh5m9LyQ2IZQ8pIppscBzhpvugmZg= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230715080833-f0f459332d1a/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230716073702-8f6c87a8aeb8 h1:+wfozysATxEl9NOm03gUF7/kpAr3Chxjn5wjCnJsfQw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230716073702-8f6c87a8aeb8/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230716093911-66aea48fb732 h1:Aq4E8kn1mN5z4ZpRYo5VFj2KektVNrTTuk0HocYMDCk= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230716093911-66aea48fb732/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230717084540-29843802ff0e h1:/eG6tAQzEaN178Aib+/erjHrE/+IjIVLRSmP4gx6D7E= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230717084540-29843802ff0e/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718004527-4b35e0e6386b h1:K3YQXnVP/W6LzwGzqOxwKmFUD5IrrNPEWYcN/fSinck= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718004527-4b35e0e6386b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718005518-289af24a8ffa h1:YmzJ1YccK3BxC/NbfB11SEUG1S6Lkz6ejg4kS3q/3Qc= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718005518-289af24a8ffa/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020415-82abcddb497b h1:baO9csa0Esnp7UW+L8zJW/ygpjGHRve4rU2/1pVvXQg= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020415-82abcddb497b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020838-c21017d2cd8b h1:FqLKDrFji0+giFwAJ3oV6dIOR6Sd/aaay76WgWIEVR8= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020838-c21017d2cd8b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718032106-40a603522d40 h1:VyFfS0d6pTX2HbZoDHOxJwag4aVSLOh/LrQXqfSJLBg= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718032106-40a603522d40/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718083804-d724cc84fa94 h1:iQPrRcZ6XfFblpVHxe/CIoWyTj7imF+3edIGSX6ZMM8= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718083804-d724cc84fa94/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718084512-89fa9e4ac585 h1:Wy6qjZ0uHfp02/H688zotRfzYGRPjun7Qay0Z9B/hSg= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718084512-89fa9e4ac585/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718105124-72a683fed2c0 h1:8LmRo2nKaLi4QCmO/agSpNTmCD0EdwFycjHjOweQJp8= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230718105124-72a683fed2c0/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003101-256bfd030c29 h1:ADScrqJgmk/TfyOu/6oXD3WkSH8sh3Bw360O8GKuEV8= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003101-256bfd030c29/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d h1:eMzrvVkQfbs5X5dcw80TGGKtJ+6XELl7zNsWiuq4gzs= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= diff --git a/main.go b/main.go index cd81f4c..0b86333 100644 --- a/main.go +++ b/main.go @@ -24,12 +24,11 @@ func main() { panic(err) } - authcache, err := common.NewAuthCollectionGlobal(ctx, config.MaingateApiToken) + wsh, err := wshandler.NewWebsocketHandler() if err != nil { panic(err) } - wsh := wshandler.NewWebsocketHandler(authcache) if tv, err := core.New(ctx, wsh, &config); err != nil { panic(err) } else {