Compare commits

...

23 Commits

Author SHA1 Message Date
8e1b232d57 InMemory 그룹을 redis로 변경 2023-07-19 09:37:02 +09:00
01da5bb3a4 body를 marshaling하고 클라이언트에서 flatten함 2023-07-18 01:31:39 +09:00
ba61a11659 gob 등록 2023-07-17 17:47:07 +09:00
67cca13326 모듈 업데이트 2023-07-16 18:41:24 +09:00
272c696c59 json value 다시 되돌림 2023-07-16 17:29:21 +09:00
aa568ec3fa SetOption 타입 변경 2023-07-16 17:26:19 +09:00
b9c4d8b21b objvalue marshalling 수정 2023-07-16 17:15:08 +09:00
99834c1461 objlen 수정 2023-07-16 17:01:06 +09:00
592112219e gocommon 업데이트 2023-07-16 16:38:05 +09:00
62485b6d54 redis json 마이그레이션 완료 2023-07-16 15:36:20 +09:00
d36dd13bb7 redis stack 사용 2023-07-16 02:51:41 +09:00
454aae5294 멤버 도큐먼트를 직접 메시지 보내는 방식으로 변경 2023-07-14 15:09:20 +09:00
30005ea0e3 binary message는 커맨드 2023-07-14 01:27:11 +09:00
9df68a4d07 sendCloseMessage대신 LeaveRoomMessage 2023-07-13 17:09:47 +09:00
8f2860165b 모듈 업데이트 2023-07-13 15:41:47 +09:00
3a1d0da531 gocommon master로 변경 2023-07-11 17:42:26 +09:00
e42eef1f51 모듈 업데이트 2023-07-11 15:26:37 +09:00
da240bd5dd message의 body를 any로 변경 2023-07-11 14:33:38 +09:00
e71b29ed1c 모듈 업데이트 2023-07-11 12:32:15 +09:00
4df30ea19c redisClient 제거 2023-07-11 11:38:09 +09:00
8c3b279850 message receiver 시그니쳐 변경 2023-07-11 11:08:31 +09:00
ec0ed1ce06 rpc 패키지 적용 2023-07-10 15:39:56 +09:00
8d0f21077d wshandler와 분리 중 2023-07-06 00:53:53 +09:00
14 changed files with 653 additions and 3107 deletions

View File

@ -1,65 +1,72 @@
{
"region_storage" : {
"default" : {
"mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"redis" : {
"url" : "redis://192.168.8.94:6379",
"offset" : {
"cache" : 0,
"session" : 1,
"ranking" : 2
"region_storage": {
"default": {
"mongo": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"redis": {
"cache": "redis://192.168.8.94:6380/0",
"session": "redis://192.168.8.94:6380/1",
"ranking": "redis://192.168.8.94:6380/2",
"wshandler": "redis://192.168.8.94:6380/3",
"tavern": "redis://192.168.8.94:6380/4"
}
},
"dev": {
"mongo": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"redis": {
"cache": "redis://192.168.8.94:6380/4",
"session": "redis://192.168.8.94:6380/5",
"ranking": "redis://192.168.8.94:6380/6",
"wshandler": "redis://192.168.8.94:6380/7",
"tavern": "redis://192.168.8.94:6380/8"
}
}
},
"dev" : {
"mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"redis" : {
"url" : "redis://192.168.8.94:6379",
"offset" : {
"cache" : 0,
"session" : 1,
"ranking" : 2
}
}
}
"maingate_mongodb_url": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"maingate_service_url": "http://localhost/maingate",
"maingate_api_token": "63d08aa34f0162622c11284b",
"tavern_service_url": "http://localhost/tavern",
"tavern_group_types": {
"subjugate": {
"text_search_field": [
"name"
],
"unique_index": [
"name,_id",
"_id,members",
"name,hidden"
],
"search_index": [
"rules"
],
"member_index": [
"_gid,candidate,luts",
"_gid,luts",
"_gid,expiring"
],
"invite_ttl": 30,
"candidate_ttl": 3600,
"invitee_exlusive": true,
"invitee_is_member": true,
"max_member": 4
},
"maingate_mongodb_url" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false",
"maingate_service_url" : "http://localhost/maingate",
"maingate_api_token" : "63d08aa34f0162622c11284b",
"tavern_service_url" : "http://localhost/tavern",
"tavern_group_types" : {
"subjugate" : {
"text_search_field" : ["name"],
"unique_index" : ["name,_id", "_id,members", "name,hidden"],
"search_index" : ["rules"],
"member_index" : ["_gid,candidate,luts","_gid,luts","_gid,expiring"],
"invite_ttl" : 30,
"candidate_ttl" : 3600,
"invitee_exlusive" : true,
"invitee_is_member" : true,
"max_member" : 4
},
"lobby" : {
"max_member" : 3,
"invitee_exlusive" : true,
"invitee_is_member" : true,
"transient" : true,
"invite_ttl" : 30
"lobby": {
"max_member": 3,
"invitee_exlusive": true,
"invitee_is_member": true,
"transient": true,
"invite_ttl": 30
}
},
"ws_sync_pipeline" : "redis://192.168.8.94:6379/3",
"services" : {
"kingdom" : {
"개발중" : {
"url" :"http://localhost/warehouse/dev",
"development" : true
"services": {
"kingdom": {
"개발중": {
"url": "http://localhost/warehouse/dev",
"development": true
},
"개인서버" : {
"url" : "http://localhost/warehouse/private",
"development" : false
"개인서버": {
"url": "http://localhost/warehouse/private",
"development": false
}
}
}

View File

@ -1,52 +1,15 @@
package core
import (
"context"
"encoding/json"
"io"
"net/http"
"strings"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func splitDocument(doc bson.M) bson.M {
setdoc := bson.M{}
unsetdoc := bson.M{}
findoc := bson.M{}
for k, v := range doc {
if k == "$set" {
setdoc = v.(bson.M)
} else if k == "$unset" {
unsetdoc = v.(bson.M)
}
}
for k, v := range doc {
if v == nil {
unsetdoc[k] = 1
} else if k[0] != '$' {
setdoc[k] = v
}
}
if len(setdoc) > 0 {
findoc["$set"] = setdoc
}
if len(unsetdoc) > 0 {
findoc["$unset"] = unsetdoc
}
return findoc
}
// CreateGroup : 그룹 생성
// - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다.
// - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다.
@ -116,11 +79,11 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) {
if candidate, ok := common.ReadBoolFormValue(r.Form, "candidate"); ok && candidate {
err = group.Candidate(gidobj, midobj, doc)
} else {
tidobj, err = group.Join(gidobj, midobj, tidobj, doc)
err = group.Join(gidobj, midobj, doc)
}
if err == nil {
json.NewEncoder(w).Encode(map[string]string{
writeBsonDoc(w, map[string]string{
"gid": gidobj.Hex(),
"tid": tidobj.Hex(),
})
@ -132,147 +95,6 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) {
}
}
func (sub *subTavern) EnterCandidateChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
if _, ok := sub.groups[typename]; !ok {
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
// candidate channel은 big endian 최상위 비트가 1
gidobj[0] |= 0x80
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, midobj, typename)
} else {
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, midobj, typename)
}
}
func (sub *subTavern) LeaveCandidateChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
if _, ok := sub.groups[typename]; !ok {
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
// candidate channel은 big endian 최상위 비트가 1
gidobj[0] |= 0x80
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.LeaveTag(sub.region, gidobj, midobj, typename)
} else {
sub.wshRpc.caller.One(midobj).LeaveTag(sub.region, gidobj, midobj, typename)
}
}
func (sub *subTavern) EnterGroupChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("EnterGroupChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterGroupChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterGroupChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
tid := group.FindTicketID(gidobj, midobj)
if tid.IsZero() {
logger.Println("EnterGroupChannel failed. tid is zero")
w.WriteHeader(http.StatusBadRequest)
return
}
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, tid, typename)
} else {
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, tid, typename)
}
writeBsonDoc(w, primitive.M{"_id": tid})
}
func (sub *subTavern) SetStateInGroup(w http.ResponseWriter, r *http.Request) {
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("SetStateInGroup failed. tag is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("SetStateInGroup failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
state, ok := common.ReadStringFormValue(r.Form, "state")
if !ok {
logger.Println("SetStateInGroup failed. state is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
typename, ok := common.ReadStringFormValue(r.Form, "type")
if !ok {
logger.Println("SetStateInGroup failed. type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
var doc bson.M
if err := readBsonDoc(r.Body, &doc); err != nil {
logger.Error("SetStateInGroup failed. readBsonDoc err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
tid := doc["_id"].(primitive.ObjectID)
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.SetStateInTag(sub.region, gid, tid, state, typename)
} else {
sub.wshRpc.caller.One(mid).SetStateInTag(sub.region, gid, tid, state, typename)
}
}
// Invite : 초대
// - type : 초대 타입 (required)
// - from : 초대하는 자 (required)
@ -305,14 +127,14 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) {
Invitee bson.M `bson:"invitee"`
}
if err := readBsonDoc(r.Body, &reqdoc); err != nil {
logger.Error("Invite failed. readBsonDoc returns err :", err)
logger.Println("Invite failed. readBsonDoc returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
result, err := group.Invite(gid, mid, reqdoc.Inviter, reqdoc.Invitee)
if err != nil {
logger.Error("Invite failed. group.Invite returns err :", err)
logger.Println("Invite failed. group.Invite returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@ -320,52 +142,6 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(result))
}
func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("UpdateGroupMember failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("UpdateGroupMember failed. gid is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid")
tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
if !midok && !tidok {
// 둘다 없네?
logger.Println("JoinGroup failed. tid or mid should be exist")
w.WriteHeader(http.StatusBadRequest)
return
}
var err error
delete, _ := common.ReadBoolFormValue(r.Form, "delete")
if delete {
err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil)
} else {
var doc bson.M
if err := readBsonDoc(r.Body, &doc); err != nil {
logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc)
}
if err != nil {
logger.Println("UpdateGroupMember failed. Update returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
@ -404,12 +180,6 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) {
gid, _ := common.ReadObjectIDFormValue(r.Form, "gid")
mid, _ := common.ReadObjectIDFormValue(r.Form, "mid")
tid, ok := common.ReadObjectIDFormValue(r.Form, "tid")
if !ok {
logger.Println("CancelInvitation failed. form value 'tid' is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
var member bson.M
if err := readBsonDoc(r.Body, &member); err != nil {
@ -418,14 +188,12 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) {
return
}
gidbytes, err := group.AcceptInvitation(gid, mid, tid, member)
err := group.AcceptInvitation(gid, mid, member)
if err != nil {
logger.Error("AcceptInvitation failed. group.AcceptInvitation returns err :", err)
logger.Println("AcceptInvitation failed. group.AcceptInvitation returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(gidbytes.Hex()))
}
func (sub *subTavern) DenyInvitation(w http.ResponseWriter, r *http.Request) {
@ -490,162 +258,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) {
}
}
func (sub *subTavern) TurnGroupOnline(w http.ResponseWriter, r *http.Request) {
// group을 online 상태로 만든다.
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
// online인 group을 가지고 뭘 할지는 게임이 알아서...
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("TurnGroupOnline failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
if !ok {
logger.Println("TurnGroupOnline failed. group id '_id' form value is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("TurnGroupOnline failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
var filter bson.M
if err := readBsonDoc(r.Body, &filter); err != nil {
logger.Error("TurnGroupOnline failed. readBsonDoc return err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
exist, err := group.Exist(gid, filter)
if err != nil {
logger.Error("TurnGroupOnline failed. FindOne return err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if !exist {
logger.Println("TurnGroupOnline failed. filter not match", filter)
w.WriteHeader(http.StatusBadRequest)
return
}
score, ok := common.ReadFloatFormValue(r.Form, "score")
if !ok {
score = 100
}
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
} else {
err = sub.wshRpc.caller.One(mid).TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
}
if err != nil {
logger.Error("TurnGroupOnline failed. TurnGroupOnline err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) TurnGroupOffline(w http.ResponseWriter, r *http.Request) {
// group을 offline 상태로 만든다.
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
// online인 group을 가지고 뭘 할지는 게임이 알아서...
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("TurnGroupOffline failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
if !ok {
logger.Println("TurnGroupOffline failed. group id '_id' form value is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("TurnGroupOffline failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
// onlinename := onlineGroupQueryKey(typename)
// if onClose := conn.UnregistOnCloseFunc(onlinename); onClose != nil {
// onClose()
// } else {
// gid, ok := common.ReadStringFormValue(form, "_id")
// if ok {
// sub.redisSync.ZRem(context.Background(), onlinename, gid)
// }
// }
var err error
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOffline(onlineGroupQueryKey(typename), gid)
} else {
err = sub.wshRpc.caller.One(mid).TurnGroupOffline(onlineGroupQueryKey(typename), gid)
}
if err != nil {
logger.Error("TurnGroupOffline failed. TurnGroupOnline err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("QueryOnlineGroup failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
var cmd *redis.StringSliceCmd
scoreStart, _ := common.ReadStringFormValue(r.Form, "score_start")
scoreStop, _ := common.ReadStringFormValue(r.Form, "score_stop")
if len(scoreStart) > 0 || len(scoreStop) > 0 {
if len(scoreStart) == 0 {
scoreStart = "-inf"
}
if len(scoreStop) == 0 {
scoreStop = "+inf"
}
cmd = sub.wsh.RedisSync.ZRangeArgs(context.Background(), redis.ZRangeArgs{
Key: onlineGroupQueryKey(typename),
ByScore: true,
Start: scoreStart,
Stop: scoreStop,
Rev: true,
Count: 1,
})
} else {
// 아무거나
cmd = sub.wsh.RedisSync.ZRandMember(context.Background(), onlineGroupQueryKey(typename), 1, false)
}
result, err := cmd.Result()
if err != nil {
logger.Error("QueryOnlineGroup failed. redid.ZRandMember returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
writeBsonDoc(w, bson.M{"r": result})
}
func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
@ -676,7 +288,7 @@ func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) {
}
if err := writeBsonArr(w, result); err != nil {
logger.Error("json marshal failed :", err)
logger.Error("bson marshal failed :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@ -690,11 +302,7 @@ func (sub *subTavern) QueryOnlineState(w http.ResponseWriter, r *http.Request) {
return
}
state, err := sub.wsh.GetState(mid)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
state := sub.wsh.GetState(sub.region, mid)
w.Write([]byte(state))
}
@ -706,13 +314,7 @@ func (sub *subTavern) IsOnline(w http.ResponseWriter, r *http.Request) {
return
}
ok, err := sub.wsh.IsOnline(mid)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if ok {
if state := sub.wsh.GetState(sub.region, mid); len(state) > 0 {
w.Write([]byte("true"))
} else {
w.Write([]byte("false"))
@ -763,96 +365,7 @@ func (sub *subTavern) QueryGroup(w http.ResponseWriter, r *http.Request) {
}
if err := writeBsonDoc(w, result); err != nil {
logger.Error("json marshal failed :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
// QueryGroupMembers : 그룹내 멤버 조회
// - type : 그룹 타입
// - 그룹 타입에 맞는 키(주로 _id)
// - projection : select할 필드. ,로 구분
func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("QueryGroupMembers failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("QueryGroupMembers failed. _id is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, _ := common.ReadObjectIDFormValue(r.Form, "mid")
var after primitive.Timestamp
if ts, ok := common.ReadStringFormValue(r.Form, "after"); ok && ts != "0.0" {
after = common.DotStringToTimestamp(ts)
}
projection, _ := common.ReadStringFormValue(r.Form, "projection")
result, err := group.QueryMembers(gidobj, midobj, projection, after)
if err != nil {
logger.Error("QueryGroupMembers failed. FindAll err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if result == nil {
return
}
if err := writeBsonDoc(w, result); err != nil {
logger.Error("QueryGroupMembers failed. writeBsonArr err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (sub *subTavern) QueryGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("QueryGroupMember failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("QueryGroupMember failed. gid is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, midok := common.ReadObjectIDFormValue(r.Form, "mid")
tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
if !midok && !tidok {
// 둘 중 하나는 있어야지
logger.Println("QueryGroupMember failed. tid and mid are both missing")
w.WriteHeader(http.StatusBadRequest)
return
}
projection, _ := common.ReadStringFormValue(r.Form, "projection")
result, err := group.QueryMember(gid, mid, tid, projection)
if err != nil {
logger.Println("QueryGroupMember failed. group.QueryMember returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if result == nil {
return
}
if err := writeBsonDoc(w, result); err != nil {
logger.Error("QueryGroupMember failed. writeBsonDoc err :", err)
logger.Error("bson marshal failed :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@ -878,15 +391,14 @@ func (sub *subTavern) LeaveGroup(w http.ResponseWriter, r *http.Request) {
return
}
mid, midok := common.ReadObjectIDFormValue(r.Form, "mid")
tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
if !midok && !tidok {
// 둘 중 하나는 있어야지
logger.Println("LeaveGroup failed. tid and mid are both missing")
if !midok {
logger.Println("LeaveGroup failed. mid is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
if err := group.Leave(gid, mid, tid); err != nil {
if err := group.Leave(gid, mid); err != nil {
// 둘 중 하나는 있어야지
logger.Println("LeaveGroup failed. group.Leave returns err :", err)
w.WriteHeader(http.StatusBadRequest)
@ -969,99 +481,46 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
var frag bson.M
if err := readBsonDoc(r.Body, &frag); err != nil {
logger.Error("UpdateGroupDocument failed. readBsonDoc err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if err := group.UpdateGroupDocument(gid, body); err != nil {
if err := group.UpdateGroupDocument(gid, frag); err != nil {
logger.Error("UpdateGroupDocument failed. group.UpdateGroupDocument returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("DropDeadMember failed. type is missing")
logger.Println("QueryGroupMembers failed. type is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("DropDeadMember failed. gid is missing")
logger.Println("QueryGroupMembers failed. gid is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("DropDeadMember failed. mid is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
if err := group.DropPausedMember(gid, mid); err != nil {
logger.Error("DropDeadMember failed. group.DropDeadMember returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
redisSync := sub.wsh.RedisSync
for msg := range deliveryChan {
mid := msg.Alias
if msg.Body != nil {
buffer := msg.Body
var channame string
for i, ch := range buffer {
if ch == 0 {
channame = string(buffer[:i])
buffer = buffer[i+1:]
break
}
}
if len(channame) == 0 {
continue
}
buffer = append(mid[:], buffer...)
_, err := redisSync.Publish(context.Background(), channame, buffer).Result()
members, err := group.QueryGroupMembers(gid)
if err != nil {
logger.Error(err)
}
logger.Error("QueryGroupMembers failed. group.QueryGroupMembers returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if len(msg.Command) > 0 {
switch msg.Command {
case "pause":
gidtype := msg.Conn.GetTag("gid")
if len(gidtype) > 0 {
tokens := strings.SplitN(gidtype, "@", 2)
gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
gtype := tokens[1]
group := sub.groups[gtype]
if group != nil {
group.PauseMember(gidobj, msg.Alias, msg.Conn)
if err := writeBsonDoc(w, members); err != nil {
logger.Error("QueryGroupMembers failed. writeBsonDoc return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
}
}
}
logger.Println("delivery chan fin")
}

1
core/config.json Normal file
View File

@ -0,0 +1 @@
{}

View File

@ -3,8 +3,6 @@ package core
import (
"net/url"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@ -27,23 +25,18 @@ type groupConfig struct {
type group interface {
Create(form url.Values, doc bson.M) (primitive.ObjectID, error)
Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error)
FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error)
UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error
CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error)
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, member bson.M) error
DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error
QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error)
Exist(groupID primitive.ObjectID, filter bson.M) (bool, error)
FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error)
FindOne(groupID primitive.ObjectID, projection string) (bson.M, error)
QueryMembers(groupID primitive.ObjectID, requesterID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error)
QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error)
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error
DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID) error
UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
Dismiss(groupID primitive.ObjectID) error
UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error
UpdateGroupDocument(groupID primitive.ObjectID, doc bson.M) error
QueryGroupMembers(groupID primitive.ObjectID) (bson.M, error)
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,73 +0,0 @@
package core
import (
"context"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type richConnOuter struct {
wsh *wshandler.WebsocketHandler
rc *wshandler.Richconn
}
func (sub richConnOuter) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
sub.wsh.JoinTag(region, tag, tid, sub.rc, hint)
wsh := sub.wsh
sub.rc.RegistOnCloseFunc(tag.Hex(), func() {
wsh.LeaveTag(region, tag, tid)
})
return nil
}
func (sub richConnOuter) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
sub.SetStateInTag(region, tag, tid, "", hint)
return sub.wsh.LeaveTag(region, tag, tid)
}
func (sub richConnOuter) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
return sub.wsh.SetStateInTag(region, tag, tid, state, hint)
}
func (sub richConnOuter) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
gidhex := gid.Hex()
_, err := sub.wsh.RedisSync.ZAdd(context.Background(), key, &redis.Z{Score: score, Member: gidhex}).Result()
if err != nil {
logger.Error("TurnGroupOnline failed. redis.ZAdd return err :", err)
return err
}
sub.rc.RegistOnCloseFunc(key, func() {
sub.wsh.RedisSync.ZRem(context.Background(), key, gidhex)
})
return nil
}
func (sub richConnOuter) TurnGroupOffline(key string, gid primitive.ObjectID) error {
f := sub.rc.UnregistOnCloseFunc(key)
if f != nil {
f()
} else {
sub.wsh.RedisSync.ZRem(context.Background(), key, gid.Hex())
}
return nil
}
func (sub richConnOuter) SendMessage(doc []byte) error {
return sub.rc.WriteBytes(doc)
}
func (sub richConnOuter) SendMessageToTag(region string, tag primitive.ObjectID, msg []byte) error {
sub.wsh.BroadcastRaw(region, tag, msg)
return nil
}
func (sub richConnOuter) CloseOnPurpose() error {
return sub.rc.Close()
}

View File

@ -1,195 +0,0 @@
package rpc
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11})
func init() {
gob.Register(bson.M{})
gob.Register(primitive.ObjectID{})
gob.Register(primitive.Timestamp{})
}
type RpcCaller struct {
publish func(bt []byte) error
}
func NewRpcCaller(f func(bt []byte) error) RpcCaller {
return RpcCaller{
publish: f,
}
}
type rpcCallContext struct {
alias primitive.ObjectID
publish func(bt []byte) error
}
func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext {
return rpcCallContext{
alias: alias,
publish: c.publish,
}
}
func (c *RpcCaller) Everybody() rpcCallContext {
return rpcCallContext{
alias: Everybody,
publish: c.publish,
}
}
func IsCallerCalleeMethodMatch[Callee any]() error {
var caller rpcCallContext
var callee Callee
callerType := reflect.TypeOf(caller)
calleeType := reflect.TypeOf(callee)
for i := 0; i < callerType.NumMethod(); i++ {
callerMethod := callerType.Method(i)
calleeMethod, ok := calleeType.MethodByName(callerMethod.Name)
if !ok {
return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name())
}
if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() {
return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name)
}
if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() {
return fmt.Errorf("method '%s' out num is not match", callerMethod.Name)
}
for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ {
if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) {
return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name())
}
}
}
return nil
}
type fnsig struct {
FunctionName string `bson:"fn"`
Args []any `bson:"args"`
}
func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) {
m := append([]any{
prefix,
fn,
}, args...)
buff := new(bytes.Buffer)
encoder := gob.NewEncoder(buff)
err := encoder.Encode(m)
if err != nil {
logger.Error("rpcCallContext.send err :", err)
return nil, err
}
return buff.Bytes(), nil
}
func Decode[T any](src []byte) (*T, string, []any, error) {
var m []any
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&m); err != nil {
logger.Error("RpcCallee.Call err :", err)
return nil, "", nil, err
}
prfix := m[0].(T)
fn := m[1].(string)
return &prfix, fn, m[2:], nil
}
func decode(src []byte) (string, []any, error) {
var sig fnsig
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&sig); err != nil {
logger.Error("RpcCallee.Call err :", err)
return "", nil, err
}
return sig.FunctionName, sig.Args, nil
}
func (c *rpcCallContext) send(fn string, args ...any) error {
bt, err := Encode(c.alias, fn, args...)
if err != nil {
return err
}
return c.publish(bt)
}
type RpcCallee[T any] struct {
methods map[string]reflect.Method
create func(*wshandler.Richconn) *T
}
func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] {
out := RpcCallee[T]{
methods: make(map[string]reflect.Method),
create: createReceiverFunc,
}
var tmp *T
tp := reflect.TypeOf(tmp)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
out.methods[method.Name] = method
}
return out
}
func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
fn, params, err := decode(src)
if err != nil {
logger.Error("RpcCallee.Call err :", err)
return err
}
method, ok := r.methods[fn]
if !ok {
err := fmt.Errorf("method '%s' is missing", fn)
logger.Error("RpcCallee.Call err :", err)
return err
}
receiver := r.create(rc)
args := []reflect.Value{
reflect.ValueOf(receiver),
}
for _, arg := range params {
args = append(args, reflect.ValueOf(arg))
}
rets := method.Func.Call(args)
if len(rets) > 0 && rets[len(rets)-1].Interface() != nil {
return rets[len(rets)-1].Interface().(error)
}
return nil
}

View File

@ -1,33 +0,0 @@
package rpc
import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("JoinTag", region, tag, tid, hint)
}
func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("LeaveTag", region, tag, tid, hint)
}
func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
return c.send("TurnGroupOnline", key, gid, score)
}
func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error {
return c.send("TurnGroupOffline", key, gid)
}
func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
return c.send("SetStateInTag", region, tag, tid, state, hint)
}
func (c rpcCallContext) SendMessage(doc []byte) error {
return c.send("SendMessage", doc)
}
func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error {
return c.send("SendMessageToTag", region, gid, msg)
}
func (c rpcCallContext) CloseOnPurpose() error {
return c.send("CloseOnPurpose")
}

View File

@ -2,19 +2,20 @@ package core
import (
"context"
"encoding/json"
"errors"
"io"
"net"
"net/http"
"reflect"
"strings"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core/rpc"
"time"
"github.com/go-redis/redis/v8"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/bson/primitive"
@ -30,10 +31,6 @@ func writeBsonArr(w io.Writer, src []bson.M) error {
})
}
func onlineGroupQueryKey(prefix string) string {
return prefix + "_olg"
}
func writeBsonDoc[T any](w io.Writer, src T) error {
rw, err := bsonrw.NewBSONValueWriter(w)
if err != nil {
@ -71,41 +68,12 @@ func readBsonDoc(r io.Reader, src any) error {
return nil
}
type rpcCallDomain[T any] struct {
rpcCallChanName string
caller rpc.RpcCaller
callee rpc.RpcCallee[T]
methods map[string]reflect.Method
}
func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] {
var tmp *CalleeType
methods := make(map[string]reflect.Method)
tp := reflect.TypeOf(tmp)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
methods[method.Name] = method
}
rpcChanName := "conn_rpc_channel_" + tp.Name()
publishFunc := func(bt []byte) error {
_, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result()
return err
}
return rpcCallDomain[CalleeType]{
rpcCallChanName: rpcChanName,
caller: rpc.NewRpcCaller(publishFunc),
callee: rpc.NewRpcCallee(creator),
methods: methods,
}
}
type TavernConfig struct {
common.RegionStorageConfig `json:",inline"`
gocommon.RegionStorageConfig `json:",inline"`
GroupTypes map[string]*groupConfig `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"`
RedisURL string `json:"tavern_redis_url"`
macAddr string
}
@ -117,12 +85,12 @@ type Tavern struct {
}
type subTavern struct {
mongoClient common.MongoClient
mongoClient gocommon.MongoClient
redisClient *redis.Client
wsh *wshandler.WebsocketHandler
region string
groups map[string]group
methods map[string]reflect.Method
wshRpc rpcCallDomain[richConnOuter]
}
func getMacAddr() (string, error) {
@ -145,7 +113,7 @@ func getMacAddr() (string, error) {
func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) {
if inconfig == nil {
var loaded TavernConfig
if err := common.LoadConfig(&loaded); err != nil {
if err := gocommon.LoadConfig(&loaded); err != nil {
return nil, err
}
inconfig = &loaded
@ -157,7 +125,7 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err
}
config.macAddr = macaddr
tv := Tavern{
tv := &Tavern{
wsh: wsh,
}
@ -166,39 +134,21 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err
}
return &tv, nil
return tv, nil
}
func (tv *Tavern) Destructor() {
tv.wsh.Destructor()
func (tv *Tavern) Cleanup() {
for _, st := range tv.subTaverns {
st.mongoClient.Close()
}
}
type groupPipelineDocument struct {
OperationType string `bson:"operationType"`
FullDocument map[string]any `bson:"fullDocument"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
UpdateDescription struct {
UpdatedFields bson.M `bson:"updatedFields"`
RemovedFileds bson.A `bson:"removedFields"`
TruncatedArrays bson.A `bson:"truncatedArrays"`
} `bson:"updateDescription"`
}
func (tv *Tavern) prepare(ctx context.Context) error {
for region, url := range config.RegionStorage {
var dbconn common.MongoClient
for region, addr := range config.RegionStorage {
var dbconn gocommon.MongoClient
var err error
var groupinstance group
if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil {
return err
}
var tmp *subTavern
methods := make(map[string]reflect.Method)
tp := reflect.TypeOf(tmp)
@ -207,30 +157,33 @@ func (tv *Tavern) prepare(ctx context.Context) error {
methods[method.Name] = method
}
redisClient, err := gocommon.NewRedisClient(addr.Redis["tavern"])
if err != nil {
return err
}
sub := &subTavern{
wsh: tv.wsh,
mongoClient: dbconn,
redisClient: redisClient,
region: region,
methods: methods,
}
sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter {
return &richConnOuter{wsh: sub.wsh, rc: rc}
})
groups := make(map[string]group)
for typename, cfg := range config.GroupTypes {
cfg.Name = typename
if cfg.Transient {
groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh)
} else {
if !dbconn.Connected() {
dbconn, err = common.NewMongoClient(ctx, url.Mongo, region)
if err != nil {
return err
}
}
groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
groupinstance, err = cfg.prepareInMemory(ctx, typename, sub)
//} else {
// TODO : db
// if !dbconn.Connected() {
// dbconn, err = gocommon.NewMongoClient(ctx, url.Mongo, region)
// if err != nil {
// return err
// }
// }
// groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
}
if err != nil {
return err
@ -246,25 +199,72 @@ func (tv *Tavern) prepare(ctx context.Context) error {
}
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
// request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern
// 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다.
// 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?)
for _, sub := range tv.subTaverns {
tv.wsh.RegisterReceiver(sub.region, sub)
var pattern string
if sub.region == "default" {
pattern = common.MakeHttpHandlerPattern(prefix, "api")
pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
} else {
pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api")
pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api")
}
serveMux.HandleFunc(pattern, sub.api)
deliveryChan := tv.wsh.DeliveryChannel(sub.region)
go sub.deliveryMessageHandler(deliveryChan)
}
return nil
}
func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected {
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
} else if messageType == wshandler.Disconnected {
logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
} else if messageType == wshandler.BinaryMessage {
var msg map[string][]any
dec := json.NewDecoder(body)
if err := dec.Decode(&msg); err == nil {
for cmd, args := range msg {
switch cmd {
case "EnterChannel":
sub.wsh.EnterRoom(sub.region, args[0].(string), sender.Accid)
case "LeaveChannel":
sub.wsh.LeaveRoom(sub.region, args[0].(string), sender.Accid)
case "UpdateGroupMemberDocument":
typename := args[0].(string)
gidobj, _ := primitive.ObjectIDFromHex(args[1].(string))
doc := args[2].(map[string]any)
if group := sub.groups[typename]; group != nil {
group.UpdateMemberDocument(gidobj, sender.Accid, doc)
}
case "UpdateGroupDocument":
typename := args[0].(string)
gidobj, _ := primitive.ObjectIDFromHex(args[1].(string))
doc := args[2].(map[string]any)
if group := sub.groups[typename]; group != nil {
group.UpdateGroupDocument(gidobj, doc)
}
}
}
}
}
}
func (sub *subTavern) OnRoomCreated(region, name string) {
_, err := sub.redisClient.Persist(context.Background(), name).Result()
if err != nil {
logger.Println("OnRoomCreate Persist failed :", err)
}
}
func (sub *subTavern) OnRoomDestroyed(region, name string) {
_, err := sub.redisClient.Expire(context.Background(), name, 3600*time.Second).Result()
if err != nil {
logger.Println("OnRoomDestroyed Persist failed :", err)
}
}
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
defer func() {
s := recover()

77
core/tavern_test.go Normal file
View File

@ -0,0 +1,77 @@
// warroom project main.go
package core
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func TestPubSub(t *testing.T) {
opt0, _ := redis.ParseURL("redis://192.168.8.94:6380/0")
opt1, _ := redis.ParseURL("redis://192.168.8.94:6380/1")
rc0 := redis.NewClient(opt0)
rc1 := redis.NewClient(opt1)
go func() {
time.Sleep(time.Second)
rc1.Publish(context.Background(), "__testchan", "real???")
fmt.Println("published")
}()
pubsub := rc0.Subscribe(context.Background(), "__testchan")
msg, err := pubsub.ReceiveMessage(context.Background())
fmt.Println(msg.Payload, err)
}
func TestReJSON(t *testing.T) {
rc := redis.NewClient(&redis.Options{Addr: "192.168.8.94:6380"})
rh := gocommon.NewRedisonHandler(context.Background(), rc)
testDoc := map[string]any{
"members": map[string]any{
"mid2": map[string]any{
"key": "val",
"exp": 20202020,
},
"mid1": map[string]any{
"key": "val",
"exp": 10101010,
},
},
}
gd := groupDoc{
id: primitive.NewObjectID(),
}
midin := primitive.NewObjectID()
tid := gd.tid(midin)
midout := gd.mid(tid)
logger.Println(midin, tid, midout)
logger.Println(rh.JSONSet("jsontest", "$", testDoc))
logger.Println(rh.JSONGet("jsontest", "$"))
logger.Println(rh.JSONResp("jsontest", "$.members"))
logger.Println(rh.JSONGetString("jsontest", "$.members..key"))
logger.Println(rh.JSONGetInt64("jsontest", "$.members..exp"))
logger.Println(rh.JSONObjKeys("jsontest", "$.members"))
err := rh.JSONMSet("jsontest", map[string]any{
"$.members.mid1.key": "newval",
"$.members.mid2.key": "newval",
})
logger.Println(err)
logger.Println(rh.JSONGet("jsontest", "$"))
logger.Println(rh.JSONMDel("jsontest", []string{"$.members.mid1", "$.members.mid2"}))
logger.Println(rh.JSONGet("jsontest", "$"))
logger.Println(rh.JSONObjLen("jsontest", "$.members"))
}

8
go.mod
View File

@ -1,18 +1,19 @@
module repositories.action2quare.com/ayo/tavern
go 1.19
go 1.20
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.5.0
go.mongodb.org/mongo-driver v1.11.7
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22
repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
@ -24,4 +25,5 @@ require (
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

36
go.sum
View File

@ -11,8 +11,9 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
@ -93,8 +94,9 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
@ -102,5 +104,31 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230715080833-f0f459332d1a h1:n2FF/GQYtCsi57Lh5m9LyQ2IZQ8pIppscBzhpvugmZg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230715080833-f0f459332d1a/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230716073702-8f6c87a8aeb8 h1:+wfozysATxEl9NOm03gUF7/kpAr3Chxjn5wjCnJsfQw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230716073702-8f6c87a8aeb8/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230716093911-66aea48fb732 h1:Aq4E8kn1mN5z4ZpRYo5VFj2KektVNrTTuk0HocYMDCk=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230716093911-66aea48fb732/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230717084540-29843802ff0e h1:/eG6tAQzEaN178Aib+/erjHrE/+IjIVLRSmP4gx6D7E=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230717084540-29843802ff0e/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718004527-4b35e0e6386b h1:K3YQXnVP/W6LzwGzqOxwKmFUD5IrrNPEWYcN/fSinck=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718004527-4b35e0e6386b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718005518-289af24a8ffa h1:YmzJ1YccK3BxC/NbfB11SEUG1S6Lkz6ejg4kS3q/3Qc=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718005518-289af24a8ffa/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020415-82abcddb497b h1:baO9csa0Esnp7UW+L8zJW/ygpjGHRve4rU2/1pVvXQg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020415-82abcddb497b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020838-c21017d2cd8b h1:FqLKDrFji0+giFwAJ3oV6dIOR6Sd/aaay76WgWIEVR8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718020838-c21017d2cd8b/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718032106-40a603522d40 h1:VyFfS0d6pTX2HbZoDHOxJwag4aVSLOh/LrQXqfSJLBg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718032106-40a603522d40/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718083804-d724cc84fa94 h1:iQPrRcZ6XfFblpVHxe/CIoWyTj7imF+3edIGSX6ZMM8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718083804-d724cc84fa94/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718084512-89fa9e4ac585 h1:Wy6qjZ0uHfp02/H688zotRfzYGRPjun7Qay0Z9B/hSg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718084512-89fa9e4ac585/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718105124-72a683fed2c0 h1:8LmRo2nKaLi4QCmO/agSpNTmCD0EdwFycjHjOweQJp8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230718105124-72a683fed2c0/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003101-256bfd030c29 h1:ADScrqJgmk/TfyOu/6oXD3WkSH8sh3Bw360O8GKuEV8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003101-256bfd030c29/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d h1:eMzrvVkQfbs5X5dcw80TGGKtJ+6XELl7zNsWiuq4gzs=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230719003337-29b2f258507d/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=

10
main.go
View File

@ -24,23 +24,23 @@ func main() {
panic(err)
}
authcache, err := common.NewAuthCollectionGlobal(ctx, config.MaingateApiToken)
wsh, err := wshandler.NewWebsocketHandler()
if err != nil {
panic(err)
}
wsh := wshandler.NewWebsocketHandler(authcache)
if tv, err := core.New(ctx, wsh, &config); err != nil {
panic(err)
} else {
serveMux := http.NewServeMux()
wsh.RegisterHandlers(ctx, serveMux, *prefix)
wsh.RegisterHandlers(serveMux, *prefix)
tv.RegisterHandlers(ctx, serveMux, *prefix)
server := common.NewHTTPServer(serveMux)
logger.Println("tavern is started")
wsh.Start(ctx)
server.Start()
cancel()
tv.Destructor()
wsh.Destructor()
tv.Cleanup()
wsh.Cleanup()
}
}