Compare commits

...

7 Commits

Author SHA1 Message Date
e42eef1f51 모듈 업데이트 2023-07-11 15:26:37 +09:00
da240bd5dd message의 body를 any로 변경 2023-07-11 14:33:38 +09:00
e71b29ed1c 모듈 업데이트 2023-07-11 12:32:15 +09:00
4df30ea19c redisClient 제거 2023-07-11 11:38:09 +09:00
8c3b279850 message receiver 시그니쳐 변경 2023-07-11 11:08:31 +09:00
ec0ed1ce06 rpc 패키지 적용 2023-07-10 15:39:56 +09:00
8d0f21077d wshandler와 분리 중 2023-07-06 00:53:53 +09:00
11 changed files with 433 additions and 2058 deletions

View File

@ -5,48 +5,15 @@ import (
"encoding/json"
"io"
"net/http"
"strings"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func splitDocument(doc bson.M) bson.M {
setdoc := bson.M{}
unsetdoc := bson.M{}
findoc := bson.M{}
for k, v := range doc {
if k == "$set" {
setdoc = v.(bson.M)
} else if k == "$unset" {
unsetdoc = v.(bson.M)
}
}
for k, v := range doc {
if v == nil {
unsetdoc[k] = 1
} else if k[0] != '$' {
setdoc[k] = v
}
}
if len(setdoc) > 0 {
findoc["$set"] = setdoc
}
if len(unsetdoc) > 0 {
findoc["$unset"] = unsetdoc
}
return findoc
}
// CreateGroup : 그룹 생성
// - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다.
// - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다.
@ -132,147 +99,6 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) {
}
}
func (sub *subTavern) EnterCandidateChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
if _, ok := sub.groups[typename]; !ok {
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
// candidate channel은 big endian 최상위 비트가 1
gidobj[0] |= 0x80
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, midobj, typename)
} else {
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, midobj, typename)
}
}
func (sub *subTavern) LeaveCandidateChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
if _, ok := sub.groups[typename]; !ok {
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
// candidate channel은 big endian 최상위 비트가 1
gidobj[0] |= 0x80
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.LeaveTag(sub.region, gidobj, midobj, typename)
} else {
sub.wshRpc.caller.One(midobj).LeaveTag(sub.region, gidobj, midobj, typename)
}
}
func (sub *subTavern) EnterGroupChannel(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("EnterGroupChannel failed. group type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("EnterGroupChannel failed. mid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("EnterGroupChannel failed. gid is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
tid := group.FindTicketID(gidobj, midobj)
if tid.IsZero() {
logger.Println("EnterGroupChannel failed. tid is zero")
w.WriteHeader(http.StatusBadRequest)
return
}
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, tid, typename)
} else {
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, tid, typename)
}
writeBsonDoc(w, primitive.M{"_id": tid})
}
func (sub *subTavern) SetStateInGroup(w http.ResponseWriter, r *http.Request) {
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("SetStateInGroup failed. tag is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("SetStateInGroup failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
state, ok := common.ReadStringFormValue(r.Form, "state")
if !ok {
logger.Println("SetStateInGroup failed. state is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
typename, ok := common.ReadStringFormValue(r.Form, "type")
if !ok {
logger.Println("SetStateInGroup failed. type is missing :", r)
w.WriteHeader(http.StatusBadRequest)
return
}
var doc bson.M
if err := readBsonDoc(r.Body, &doc); err != nil {
logger.Error("SetStateInGroup failed. readBsonDoc err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
tid := doc["_id"].(primitive.ObjectID)
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
richConnOuter{wsh: sub.wsh, rc: conn}.SetStateInTag(sub.region, gid, tid, state, typename)
} else {
sub.wshRpc.caller.One(mid).SetStateInTag(sub.region, gid, tid, state, typename)
}
}
// Invite : 초대
// - type : 초대 타입 (required)
// - from : 초대하는 자 (required)
@ -320,52 +146,6 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(result))
}
func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("UpdateGroupMember failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("UpdateGroupMember failed. gid is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid")
tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
if !midok && !tidok {
// 둘다 없네?
logger.Println("JoinGroup failed. tid or mid should be exist")
w.WriteHeader(http.StatusBadRequest)
return
}
var err error
delete, _ := common.ReadBoolFormValue(r.Form, "delete")
if delete {
err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil)
} else {
var doc bson.M
if err := readBsonDoc(r.Body, &doc); err != nil {
logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc)
}
if err != nil {
logger.Println("UpdateGroupMember failed. Update returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
@ -490,119 +270,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) {
}
}
func (sub *subTavern) TurnGroupOnline(w http.ResponseWriter, r *http.Request) {
// group을 online 상태로 만든다.
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
// online인 group을 가지고 뭘 할지는 게임이 알아서...
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("TurnGroupOnline failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
if !ok {
logger.Println("TurnGroupOnline failed. group id '_id' form value is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("TurnGroupOnline failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
var filter bson.M
if err := readBsonDoc(r.Body, &filter); err != nil {
logger.Error("TurnGroupOnline failed. readBsonDoc return err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
exist, err := group.Exist(gid, filter)
if err != nil {
logger.Error("TurnGroupOnline failed. FindOne return err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if !exist {
logger.Println("TurnGroupOnline failed. filter not match", filter)
w.WriteHeader(http.StatusBadRequest)
return
}
score, ok := common.ReadFloatFormValue(r.Form, "score")
if !ok {
score = 100
}
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
} else {
err = sub.wshRpc.caller.One(mid).TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
}
if err != nil {
logger.Error("TurnGroupOnline failed. TurnGroupOnline err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) TurnGroupOffline(w http.ResponseWriter, r *http.Request) {
// group을 offline 상태로 만든다.
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
// online인 group을 가지고 뭘 할지는 게임이 알아서...
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("TurnGroupOffline failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
if !ok {
logger.Println("TurnGroupOffline failed. group id '_id' form value is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("TurnGroupOffline failed. mid form value is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
// onlinename := onlineGroupQueryKey(typename)
// if onClose := conn.UnregistOnCloseFunc(onlinename); onClose != nil {
// onClose()
// } else {
// gid, ok := common.ReadStringFormValue(form, "_id")
// if ok {
// sub.redisSync.ZRem(context.Background(), onlinename, gid)
// }
// }
var err error
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOffline(onlineGroupQueryKey(typename), gid)
} else {
err = sub.wshRpc.caller.One(mid).TurnGroupOffline(onlineGroupQueryKey(typename), gid)
}
if err != nil {
logger.Error("TurnGroupOffline failed. TurnGroupOnline err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
@ -690,11 +357,7 @@ func (sub *subTavern) QueryOnlineState(w http.ResponseWriter, r *http.Request) {
return
}
state, err := sub.wsh.GetState(mid)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
state := sub.wsh.GetState(sub.region, mid)
w.Write([]byte(state))
}
@ -706,13 +369,7 @@ func (sub *subTavern) IsOnline(w http.ResponseWriter, r *http.Request) {
return
}
ok, err := sub.wsh.IsOnline(mid)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if ok {
if state := sub.wsh.GetState(sub.region, mid); len(state) > 0 {
w.Write([]byte("true"))
} else {
w.Write([]byte("false"))
@ -983,6 +640,32 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request
}
}
func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("DismissGroup failed. type is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("UpdateMemberDocument failed. member_id is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("UpdateMemberDocument failed. _id is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
group.PauseMember(gidobj, midobj)
}
func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
@ -1013,55 +696,55 @@ func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
}
}
func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
// func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
// defer func() {
// r := recover()
// if r != nil {
// logger.Error(r)
// }
// }()
redisSync := sub.wsh.RedisSync
for msg := range deliveryChan {
mid := msg.Alias
if msg.Body != nil {
buffer := msg.Body
// redisSync := sub.wsh.RedisSync
// for msg := range deliveryChan {
// mid := msg.Alias
// if msg.Body != nil {
// buffer := msg.Body
var channame string
for i, ch := range buffer {
if ch == 0 {
channame = string(buffer[:i])
buffer = buffer[i+1:]
break
}
}
// var channame string
// for i, ch := range buffer {
// if ch == 0 {
// channame = string(buffer[:i])
// buffer = buffer[i+1:]
// break
// }
// }
if len(channame) == 0 {
continue
}
// if len(channame) == 0 {
// continue
// }
buffer = append(mid[:], buffer...)
_, err := redisSync.Publish(context.Background(), channame, buffer).Result()
if err != nil {
logger.Error(err)
}
}
// buffer = append(mid[:], buffer...)
// _, err := redisSync.Publish(context.Background(), channame, buffer).Result()
// if err != nil {
// logger.Error(err)
// }
// }
if len(msg.Command) > 0 {
switch msg.Command {
case "pause":
gidtype := msg.Conn.GetTag("gid")
if len(gidtype) > 0 {
tokens := strings.SplitN(gidtype, "@", 2)
gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
gtype := tokens[1]
group := sub.groups[gtype]
if group != nil {
group.PauseMember(gidobj, msg.Alias, msg.Conn)
}
}
}
}
}
logger.Println("delivery chan fin")
}
// if len(msg.Command) > 0 {
// switch msg.Command {
// case "pause":
// gidtype := msg.Conn.GetTag("gid")
// if len(gidtype) > 0 {
// tokens := strings.SplitN(gidtype, "@", 2)
// gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
// gtype := tokens[1]
// group := sub.groups[gtype]
// if group != nil {
// group.PauseMember(gidobj, msg.Alias, msg.Conn)
// }
// }
// }
// }
// }
// logger.Println("delivery chan fin")
// }

View File

@ -3,8 +3,6 @@ package core
import (
"net/url"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@ -30,7 +28,6 @@ type group interface {
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error)
FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID
Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error)
UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error
CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error)
DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error
@ -42,7 +39,7 @@ type group interface {
QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error)
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error
DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
Dismiss(groupID primitive.ObjectID) error
UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error

View File

@ -10,20 +10,16 @@ import (
"fmt"
"net/url"
"os"
"path"
"reflect"
"runtime"
"strings"
"sync"
"time"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/rpc"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core/rpc"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@ -32,8 +28,6 @@ type accountID = primitive.ObjectID
type ticketID = primitive.ObjectID
type groupID = primitive.ObjectID
var everyHost, _ = primitive.ObjectIDFromHex("010203040506070809101112")
type Invitation struct {
GroupID groupID `json:"gid"`
TicketID ticketID `json:"tid"`
@ -56,7 +50,6 @@ type PublicMemberDoc struct {
type FullGroupDoc struct {
Gid groupID
DM string
AllMembers []*PublicMemberDoc `json:",omitempty"`
Body GroupDocBody `json:",omitempty"`
}
@ -69,7 +62,7 @@ type memberDoc struct {
// underscore keys in Hidden
Hidden bson.M
rconn *wshandler.Richconn
rconn *connection
Mid accountID
}
@ -99,28 +92,28 @@ func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) {
return bson.Marshal(gd.Body)
}
func (gd *groupDoc) updateBodyWithJson(src []byte) ([]byte, error) {
func (gd *groupDoc) updateBodyWithJson(src []byte) GroupDocBody {
gd.Lock()
defer gd.Unlock()
err := json.Unmarshal(src, &gd.Body)
if err != nil {
return nil, err
return nil
}
return json.Marshal(makeTypeMessage(gd.Body))
return gd.Body
}
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) (jsonBt []byte, err error) {
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) GroupDocBody {
gd.Lock()
defer gd.Unlock()
err = bson.Unmarshal(bsonSrc, &gd.Body)
err := bson.Unmarshal(bsonSrc, &gd.Body)
if err != nil {
return nil, err
return nil
}
return json.Marshal(makeTypeMessage(gd.Body))
return gd.Body
}
func (gd *groupDoc) updateBody(bsonSrc []byte) error {
@ -130,7 +123,7 @@ func (gd *groupDoc) updateBody(bsonSrc []byte) error {
return bson.Unmarshal(bsonSrc, &gd.Body)
}
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *wshandler.Richconn, ttl time.Duration, max int) (ticketID, *memberDoc) {
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) {
gd.Lock()
defer gd.Unlock()
@ -194,7 +187,7 @@ func seperateHidden(in bson.M) (public bson.M, hidden bson.M) {
return in, hidden
}
func (gd *groupDoc) addInCharge(mid accountID, rconn *wshandler.Richconn, doc bson.M) (ticketID, *memberDoc) {
func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) {
gd.Lock()
defer gd.Unlock()
@ -291,21 +284,6 @@ func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) {
}
}
func (gd *groupDoc) conns(includeInvitee bool) (out []*wshandler.Richconn) {
gd.Lock()
defer gd.Unlock()
for _, mem := range gd.tickets {
if mem.rconn != nil {
if !includeInvitee && mem.Invite {
continue
}
out = append(out, mem.rconn)
}
}
return
}
func (gd *groupDoc) ticket(mid accountID) ticketID {
gd.Lock()
defer gd.Unlock()
@ -399,7 +377,7 @@ func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) {
}
}
func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []byte {
func (gd *groupDoc) serializeFull(gid groupID) FullGroupDoc {
gd.Lock()
defer gd.Unlock()
@ -416,14 +394,11 @@ func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []b
})
}
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{
return FullGroupDoc{
Gid: gid,
DM: directMessageChanName,
AllMembers: output,
Body: gd.Body,
}))
return bt
}
}
type groupContainer struct {
@ -433,11 +408,15 @@ type groupContainer struct {
type groupInMemory struct {
*groupConfig
groupDocSync func(groupID, []byte) error
memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error
rpcCall func([]byte) error
hasConn func(accountID) *wshandler.Richconn
groups groupContainer
groupDocSync func(groupID, []byte) error
memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error
rpcCall func([]byte) error
hasConn func(accountID) *connection
sendUpstreamMessage func(*wshandler.UpstreamMessage)
sendEnterRoomMessage func(groupID, accountID)
sendLeaveRoomMessage func(groupID, accountID)
sendCloseMessage func(accountID, string)
groups groupContainer
}
func (gc *groupContainer) add(id groupID, doc *groupDoc) {
@ -496,46 +475,6 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error
}
var errGroupNotExist = errors.New("group does not exist")
var errFuncNameIsMissing = errors.New("how func name is missin")
func (gm *groupInMemory) callProxyRpc(target accountID, name string, args ...any) error {
bt, err := rpc.Encode(target, name, args...)
if err != nil {
return err
}
return gm.rpcCall(bt)
}
type rpcTarget struct {
gm *groupInMemory
target accountID
}
func (rt rpcTarget) call(args ...any) error {
pc := make([]uintptr, 1)
n := runtime.Callers(2, pc[:])
if n < 1 {
return nil
}
frame, _ := runtime.CallersFrames(pc).Next()
funcname := path.Ext(frame.Func.Name())
if len(funcname) > 0 {
funcname = funcname[1:]
return rt.gm.callProxyRpc(rt.target, funcname, args...)
}
return errFuncNameIsMissing
}
func (gm *groupInMemory) rpc(target accountID) rpcTarget {
return rpcTarget{
gm: gm,
target: target,
}
}
var errNoEmptySlot = errors.New("no more seat in group")
func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) {
@ -558,62 +497,40 @@ func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID {
return primitive.NilObjectID
}
func makeTypeMessage[T any](msg T) bson.M {
var ptr *T
name := reflect.TypeOf(ptr).Elem().Name()
return bson.M{name: msg}
}
// func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// rconn.WriteBytes(bt)
// }
func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
bt, _ := json.Marshal(makeTypeMessage(msg))
rconn.WriteBytes(bt)
}
// func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// gm.SendMessage(target, bt)
// }
func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
bt, _ := json.Marshal(makeTypeMessage(msg))
gm.SendMessage(target, bt)
}
// func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
// rconn := gm.hasConn(target)
// if rconn != nil {
// rconn.WriteBytes(msg)
// } else {
// gm.rpc(target).call(target, msg)
// }
// }
func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
rconn := gm.hasConn(target)
if rconn != nil {
rconn.WriteBytes(msg)
} else {
gm.rpc(target).call(target, msg)
}
}
// func multicast(conns []*wshandler.Richconn, raw []byte) {
// for _, rconn := range conns {
// rconn.WriteBytes(raw)
// }
// }
func multicast(conns []*wshandler.Richconn, raw []byte) {
for _, rconn := range conns {
rconn.WriteBytes(raw)
}
}
func multicastTyped[T any](conns []*wshandler.Richconn, msg T) {
bt, _ := json.Marshal(makeTypeMessage(msg))
go multicast(conns, bt)
}
func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
if gd := gm.groups.find(gid); gd != nil {
bt, _ := json.Marshal(makeTypeMessage(msg))
go multicast(gd.conns(false), bt)
}
}
// func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
// if gd := gm.groups.find(gid); gd != nil {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// go multicast(gd.conns(false), bt)
// }
// }
var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field")
func (gm *groupInMemory) SendInvitationFailed(mid accountID, inviteeDoc bson.M) error {
delete(inviteeDoc, "_mid")
rconn := gm.hasConn(mid)
if rconn == nil {
return gm.rpc(mid).call(mid, inviteeDoc)
}
sendTypedMessage(gm, mid, InvitationFail(inviteeDoc))
return nil
}
func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error {
targetid := inviteeDoc["_mid"].(accountID)
@ -621,7 +538,7 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
// invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자
rconn := gm.hasConn(targetid)
if rconn == nil {
return gm.rpc(targetid).call(gid, inviteeDoc, inviterDoc)
return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
}
gd := gm.groups.find(gid)
@ -629,10 +546,15 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
return errGroupNotExist
}
if rconn.HasOnCloseFunc("member_remove_invite") {
if rconn.hasOnCloseFunc("member_remove_invite") {
// 이미 초대 중이다.
// inviter한테 알려줘야 한다.
return gm.SendInvitationFailed(mid, inviteeDoc)
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: inviteeDoc,
Tag: []string{"InvitationFail"},
})
return nil
}
tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember)
@ -640,17 +562,21 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
return errNoEmptySlot
}
rconn.RegistOnCloseFunc("member_remove_invite", func() {
rconn.registOnCloseFunc("member_remove_invite", func() {
gd.removeMember(targetid, &tid)
gm.memberSync(gid, targetid, tid, nil, false)
})
gm.memberSync(gid, targetid, tid, newdoc, false)
sendTypedMessage(gm, targetid, Invitation{
GroupID: gid,
TicketID: tid,
Inviter: inviterDoc,
ExpireAtUTC: newdoc.InviteExpire.Unix(),
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + targetid.Hex(),
Body: Invitation{
GroupID: gid,
TicketID: tid,
Inviter: inviterDoc,
ExpireAtUTC: newdoc.InviteExpire.Unix(),
},
Tag: []string{"Invitation"},
})
return nil
@ -685,7 +611,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
if rconn == nil {
// mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다.
// 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다.
return gid.Hex(), gm.rpc(mid).call(gid, mid, inviteeDoc, inviteeDoc)
return gid.Hex(), rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
}
// 이제 여기는 mid가 InCharge이면서 rconn이 존재
@ -693,7 +619,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
if gd == nil {
_, gd = gm.groups.createWithID(gid, bson.M{})
tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc)
rconn.RegistOnCloseFunc("member_remove", func() {
rconn.registOnCloseFunc("member_remove", func() {
// 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다.
gm.groupDocSync(gid, nil)
})
@ -703,6 +629,8 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
}
gm.groupDocSync(gid, bt)
gm.memberSync(gid, mid, tid, newdoc, true)
// 내가 wshandler room에 입장
gm.sendEnterRoomMessage(gid, mid)
} else {
// targetid가 이미 멤버인지 미리 확인 가능
if !gd.ticket(targetid).IsZero() {
@ -714,9 +642,6 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc)
}
func (gm *groupInMemory) UpdateGroupMember(gid groupID, mid accountID, tid ticketID, doc bson.M) error {
return nil
}
func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error {
return nil
}
@ -729,20 +654,21 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticket
rconn := gm.hasConn(mid)
if rconn == nil {
return gid, gm.rpc(mid).call(gid, mid, tid, member)
return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member)
}
oldFunc := rconn.UnregistOnCloseFunc("member_remove")
oldFunc := rconn.unregistOnCloseFunc("member_remove")
if oldFunc != nil {
// 기존 멤버였으면 탈퇴 처리
oldFunc()
}
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite")
rconn.RegistOnCloseFunc("member_remove", inviteFunc)
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
rconn.registOnCloseFunc("member_remove", inviteFunc)
result, isNew := gd.addMember(mid, &tid, member)
if result != nil {
gm.sendEnterRoomMessage(gid, mid)
return gid, gm.memberSync(gid, mid, tid, result, isNew)
}
@ -758,10 +684,10 @@ func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID
rconn := gm.hasConn(mid)
if rconn == nil {
return gm.rpc(mid).call(gid, mid, tid)
return rpc.Make(gm).To(mid).Call(gid, mid, tid)
}
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite")
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
if inviteFunc != nil {
inviteFunc() // removeMember는 여기에 들어있다.
return nil
@ -799,21 +725,27 @@ func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.
// 드랍해야 한다.
if gd.InCharge == mid {
// 내가 방장인 경우
gm.groupDocSync(gid, nil)
return gm.groupDocSync(gid, nil)
} else {
// 내가 방장이 아닌 경우
gd.removeMember(mid, &tid)
gm.memberSync(gid, mid, tid, nil, false)
return gm.memberSync(gid, mid, tid, nil, false)
}
}
return nil
}
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, rconn *wshandler.Richconn) error {
rconn.UnregistOnCloseFunc("member_remove")
rconn.UnregistOnCloseFunc("member_remove_invite")
rconn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "pause"), time.Time{})
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
rconn := gm.hasConn(mid)
if rconn == nil {
return rpc.Make(gm).To(mid).Call(gid, mid)
}
// 접속은 끊기지만 그룹에서 제거하지는 않는 상태
rconn.unregistOnCloseFunc("member_remove")
rconn.unregistOnCloseFunc("member_remove_invite")
gm.sendCloseMessage(mid, "pause")
gd := gm.groups.find(gid)
if gd == nil {
@ -907,24 +839,43 @@ func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error {
// targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자
rconn := gm.hasConn(targetmid)
if rconn == nil {
return gm.rpc(targetmid).call(gid, mid, tid)
return rpc.Make(gm).To(targetmid).Call(gid, mid, tid)
}
if oldfunc := rconn.UnregistOnCloseFunc("member_remove"); oldfunc != nil {
if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil {
oldfunc() // 이 안에 다 있다.
}
// 나한테는 빈 FullGroupDoc을 보낸다.
sendTypedMessageDirect(rconn, FullGroupDoc{
Gid: gid,
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: FullGroupDoc{Gid: gid},
Tag: []string{"FullGroupDoc", gid.Hex()},
})
gm.sendLeaveRoomMessage(gid, targetmid)
return nil
}
func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
tid := gd.ticket(mid)
bt, _ := json.Marshal(doc)
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: gd.updateBodyWithJson(personalized),
Tag: []string{"GroupDocBody"},
})
return nil
}
func (gm *groupInMemory) Dismiss(gid groupID) error {
return nil
}
@ -942,15 +893,21 @@ func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error {
return gm.groupDocSync(gid, newbody)
}
func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool {
return gm.hasConn(target) != nil
}
var devflag = flagx.Bool("dev", false, "")
func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, typename string, wsh *wshandler.WebsocketHandler) (group, error) {
func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) {
// group document
// member document
region := sub.region
wsh := sub.wsh
groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename)
memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename)
rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename)
clientMessageChanName := fmt.Sprintf("c_mgc_%s_%s", region, typename)
toHashHex := func(name string) string {
hash := md5.New()
@ -966,7 +923,6 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
groupDocSyncChanName = toHashHex(groupDocSyncChanName)
memberSyncChanName = toHashHex(memberSyncChanName)
rpcChanName = toHashHex(rpcChanName)
clientMessageChanName = toHashHex(clientMessageChanName)
// 여기서는 subscribe channel
// 각 함수에서는 publish
@ -1000,15 +956,28 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
_, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result()
return err
},
hasConn: func(t accountID) *wshandler.Richconn {
return wsh.Conn(region, t)
hasConn: func(t accountID) *connection {
return sub.cm.get(t)
},
groups: groupContainer{
groupDocs: make(map[groupID]*groupDoc),
},
sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) {
wsh.SendUpstreamMessage(region, msg)
},
sendEnterRoomMessage: func(gid groupID, accid accountID) {
wsh.EnterRoom(region, gid.Hex(), accid)
},
sendLeaveRoomMessage: func(gid groupID, accid accountID) {
wsh.LeaveRoom(region, gid.Hex(), accid)
},
sendCloseMessage: func(target accountID, text string) {
wsh.SendCloseMessage(region, target.Hex(), text)
},
}
// TODO : processChannelMessage 스레드 분리해보자
rpc.RegistReceiver(gm)
processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub {
defer func() {
r := recover()
@ -1024,34 +993,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
}
switch msg.Channel {
case clientMessageChanName:
bt := []byte(msg.Payload)
if len(bt) < 24 {
break
}
// 주!! mid가 먼저
var mid groupID
copy(mid[:], bt[:12])
bt = bt[12:]
var gid groupID
copy(gid[:], bt[:12])
bt = bt[12:]
gd := gm.groups.find(gid)
if gd == nil {
break
}
tid, _ := gd.memberByAccount(mid)
if !tid.IsZero() {
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
if after, err := gd.updateBodyWithJson(personalized); err == nil {
go multicast(gd.conns(false), after)
}
}
case groupDocSyncChanName:
case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널
payload := []byte(msg.Payload)
if len(payload) < len(config.macAddr) {
break
@ -1069,17 +1011,18 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
if len(remain) == 0 {
// gid 그룹 삭제
// 그룹 안에 있는 멤버에게 알림
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{
Gid: gid,
}))
go multicast(gd.conns(true), bt)
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: FullGroupDoc{Gid: gid},
Tag: []string{"FullGroupDoc"},
})
gm.groups.delete(gid)
} else if string(senderHost) != config.macAddr {
if r, err := gd.updateBodyBsonToJson(remain); err != nil {
logger.Error("groupDocSyncChanName message decode failed :", remain, err)
} else {
go multicast(gd.conns(true), r)
}
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: gd.updateBodyBsonToJson(remain),
Tag: []string{"GroupDocBody"},
})
}
} else if string(senderHost) != config.macAddr {
var newDoc groupDoc
@ -1090,7 +1033,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
}
}
case memberSyncChanName:
case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널
if len(msg.Payload) < len(config.macAddr) {
break
}
@ -1120,7 +1063,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
}
var updated *memberDoc
rconn := wsh.Conn(region, mid)
rconn := gm.hasConn(mid)
if senderHost != config.macAddr {
// 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅
@ -1140,56 +1083,40 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
if rconn != nil {
// gid에 이미 다른 값이 있을 수 있다.
// 정확하게 이 값이면 제거하고, 아니면 넘어간다.
rconn.RemoveTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
}
broadcastTypedMessage(gm, gid, PublicMemberDoc{Tid: tid})
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: PublicMemberDoc{Tid: tid},
Tag: []string{"PublicMemberDoc"},
})
} else {
if isNewMember && updated.rconn == nil && rconn != nil {
updated.rconn = rconn
}
// 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외
broadcastTypedMessage(gm, gid, PublicMemberDoc{
Tid: tid,
memberDocCommon: updated.memberDocCommon,
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: PublicMemberDoc{
Tid: tid,
memberDocCommon: updated.memberDocCommon,
},
Tag: []string{"PublicMemberDoc"},
})
}
if isNewMember {
if rconn != nil {
// 새 멤버이므로 기존 멤버를 다 보내준다.
rconn.AddTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
rconn.WriteBytes(gd.serializeFull(gid, clientMessageChanName))
rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: gd.serializeFull(gid),
Tag: []string{"FullGroupDoc"},
})
}
}
case rpcChanName:
targetbt, fn, params, err := rpc.Decode[accountID]([]byte(msg.Payload))
if err != nil {
logger.Error("rpcChanName message decode failed :", msg.Payload, err)
break
}
call := func() {
method, ok := reflect.TypeOf(gm).MethodByName(fn)
if !ok {
logger.Printf("%s message decode failed :", targetbt, msg.Payload, err)
}
args := []reflect.Value{
reflect.ValueOf(gm),
}
for _, arg := range params {
args = append(args, reflect.ValueOf(arg))
}
method.Func.Call(args)
}
if *targetbt == everyHost {
call()
} else if rconn := wsh.Conn(region, *targetbt); rconn != nil {
call()
}
default:
logger.Println("unknown channel")
}
@ -1208,7 +1135,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
var pubsub *redis.PubSub
for {
if pubsub == nil {
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName, clientMessageChanName)
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName)
}
if pubsub == nil {

File diff suppressed because it is too large Load Diff

View File

@ -1,73 +1,97 @@
package core
import (
"context"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"fmt"
"strings"
"sync"
)
type richConnOuter struct {
wsh *wshandler.WebsocketHandler
rc *wshandler.Richconn
type connection struct {
locker sync.Mutex
alias string
tags []string
onClose map[string]func()
}
func (sub richConnOuter) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
sub.wsh.JoinTag(region, tag, tid, sub.rc, hint)
func (rc *connection) addTag(name, val string) {
rc.locker.Lock()
defer rc.locker.Unlock()
wsh := sub.wsh
sub.rc.RegistOnCloseFunc(tag.Hex(), func() {
wsh.LeaveTag(region, tag, tid)
})
return nil
}
func (sub richConnOuter) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
sub.SetStateInTag(region, tag, tid, "", hint)
return sub.wsh.LeaveTag(region, tag, tid)
}
func (sub richConnOuter) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
return sub.wsh.SetStateInTag(region, tag, tid, state, hint)
}
func (sub richConnOuter) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
gidhex := gid.Hex()
_, err := sub.wsh.RedisSync.ZAdd(context.Background(), key, &redis.Z{Score: score, Member: gidhex}).Result()
if err != nil {
logger.Error("TurnGroupOnline failed. redis.ZAdd return err :", err)
return err
prefix := name + "="
for i, tag := range rc.tags {
if strings.HasPrefix(tag, prefix) {
rc.tags[i] = prefix + val
return
}
}
sub.rc.RegistOnCloseFunc(key, func() {
sub.wsh.RedisSync.ZRem(context.Background(), key, gidhex)
})
return nil
rc.tags = append(rc.tags, prefix+val)
}
func (sub richConnOuter) TurnGroupOffline(key string, gid primitive.ObjectID) error {
f := sub.rc.UnregistOnCloseFunc(key)
if f != nil {
func (rc *connection) removeTag(name string, val string) {
rc.locker.Lock()
defer rc.locker.Unlock()
whole := fmt.Sprintf("%s=%s", name, val)
for i, tag := range rc.tags {
if tag == whole {
if i == 0 && len(rc.tags) == 1 {
rc.tags = nil
} else {
lastidx := len(rc.tags) - 1
if i < lastidx {
rc.tags[i] = rc.tags[lastidx]
}
rc.tags = rc.tags[:lastidx]
}
return
}
}
}
func (rc *connection) registOnCloseFunc(name string, f func()) {
rc.locker.Lock()
defer rc.locker.Unlock()
if rc.onClose == nil {
f()
} else {
sub.wsh.RedisSync.ZRem(context.Background(), key, gid.Hex())
return
}
return nil
rc.onClose[name] = f
}
func (sub richConnOuter) SendMessage(doc []byte) error {
return sub.rc.WriteBytes(doc)
func (rc *connection) hasOnCloseFunc(name string) bool {
rc.locker.Lock()
defer rc.locker.Unlock()
if rc.onClose == nil {
return false
}
_, ok := rc.onClose[name]
return ok
}
func (sub richConnOuter) SendMessageToTag(region string, tag primitive.ObjectID, msg []byte) error {
sub.wsh.BroadcastRaw(region, tag, msg)
return nil
func (rc *connection) unregistOnCloseFunc(name string) (out func()) {
rc.locker.Lock()
defer rc.locker.Unlock()
if rc.onClose == nil {
return
}
out = rc.onClose[name]
delete(rc.onClose, name)
return
}
func (sub richConnOuter) CloseOnPurpose() error {
return sub.rc.Close()
func (rc *connection) cleanup() {
rc.locker.Lock()
defer rc.locker.Unlock()
cp := rc.onClose
rc.onClose = nil
go func() {
for _, f := range cp {
f()
}
}()
}

View File

@ -1,195 +0,0 @@
package rpc
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11})
func init() {
gob.Register(bson.M{})
gob.Register(primitive.ObjectID{})
gob.Register(primitive.Timestamp{})
}
type RpcCaller struct {
publish func(bt []byte) error
}
func NewRpcCaller(f func(bt []byte) error) RpcCaller {
return RpcCaller{
publish: f,
}
}
type rpcCallContext struct {
alias primitive.ObjectID
publish func(bt []byte) error
}
func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext {
return rpcCallContext{
alias: alias,
publish: c.publish,
}
}
func (c *RpcCaller) Everybody() rpcCallContext {
return rpcCallContext{
alias: Everybody,
publish: c.publish,
}
}
func IsCallerCalleeMethodMatch[Callee any]() error {
var caller rpcCallContext
var callee Callee
callerType := reflect.TypeOf(caller)
calleeType := reflect.TypeOf(callee)
for i := 0; i < callerType.NumMethod(); i++ {
callerMethod := callerType.Method(i)
calleeMethod, ok := calleeType.MethodByName(callerMethod.Name)
if !ok {
return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name())
}
if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() {
return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name)
}
if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() {
return fmt.Errorf("method '%s' out num is not match", callerMethod.Name)
}
for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ {
if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) {
return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name())
}
}
}
return nil
}
type fnsig struct {
FunctionName string `bson:"fn"`
Args []any `bson:"args"`
}
func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) {
m := append([]any{
prefix,
fn,
}, args...)
buff := new(bytes.Buffer)
encoder := gob.NewEncoder(buff)
err := encoder.Encode(m)
if err != nil {
logger.Error("rpcCallContext.send err :", err)
return nil, err
}
return buff.Bytes(), nil
}
func Decode[T any](src []byte) (*T, string, []any, error) {
var m []any
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&m); err != nil {
logger.Error("RpcCallee.Call err :", err)
return nil, "", nil, err
}
prfix := m[0].(T)
fn := m[1].(string)
return &prfix, fn, m[2:], nil
}
func decode(src []byte) (string, []any, error) {
var sig fnsig
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&sig); err != nil {
logger.Error("RpcCallee.Call err :", err)
return "", nil, err
}
return sig.FunctionName, sig.Args, nil
}
func (c *rpcCallContext) send(fn string, args ...any) error {
bt, err := Encode(c.alias, fn, args...)
if err != nil {
return err
}
return c.publish(bt)
}
type RpcCallee[T any] struct {
methods map[string]reflect.Method
create func(*wshandler.Richconn) *T
}
func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] {
out := RpcCallee[T]{
methods: make(map[string]reflect.Method),
create: createReceiverFunc,
}
var tmp *T
tp := reflect.TypeOf(tmp)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
out.methods[method.Name] = method
}
return out
}
func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
fn, params, err := decode(src)
if err != nil {
logger.Error("RpcCallee.Call err :", err)
return err
}
method, ok := r.methods[fn]
if !ok {
err := fmt.Errorf("method '%s' is missing", fn)
logger.Error("RpcCallee.Call err :", err)
return err
}
receiver := r.create(rc)
args := []reflect.Value{
reflect.ValueOf(receiver),
}
for _, arg := range params {
args = append(args, reflect.ValueOf(arg))
}
rets := method.Func.Call(args)
if len(rets) > 0 && rets[len(rets)-1].Interface() != nil {
return rets[len(rets)-1].Interface().(error)
}
return nil
}

View File

@ -1,33 +0,0 @@
package rpc
import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("JoinTag", region, tag, tid, hint)
}
func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("LeaveTag", region, tag, tid, hint)
}
func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
return c.send("TurnGroupOnline", key, gid, score)
}
func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error {
return c.send("TurnGroupOffline", key, gid)
}
func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
return c.send("SetStateInTag", region, tag, tid, state, hint)
}
func (c rpcCallContext) SendMessage(doc []byte) error {
return c.send("SendMessage", doc)
}
func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error {
return c.send("SendMessageToTag", region, gid, msg)
}
func (c rpcCallContext) CloseOnPurpose() error {
return c.send("CloseOnPurpose")
}

View File

@ -8,13 +8,12 @@ import (
"net/http"
"reflect"
"strings"
"sync"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core/rpc"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/bson/primitive"
@ -71,58 +70,66 @@ func readBsonDoc(r io.Reader, src any) error {
return nil
}
type rpcCallDomain[T any] struct {
rpcCallChanName string
caller rpc.RpcCaller
callee rpc.RpcCallee[T]
methods map[string]reflect.Method
}
func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] {
var tmp *CalleeType
methods := make(map[string]reflect.Method)
tp := reflect.TypeOf(tmp)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
methods[method.Name] = method
}
rpcChanName := "conn_rpc_channel_" + tp.Name()
publishFunc := func(bt []byte) error {
_, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result()
return err
}
return rpcCallDomain[CalleeType]{
rpcCallChanName: rpcChanName,
caller: rpc.NewRpcCaller(publishFunc),
callee: rpc.NewRpcCallee(creator),
methods: methods,
}
}
type TavernConfig struct {
common.RegionStorageConfig `json:",inline"`
gocommon.RegionStorageConfig `json:",inline"`
GroupTypes map[string]*groupConfig `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"`
RedisURL string `json:"tavern_redis_url"`
macAddr string
}
var config TavernConfig
type connectionMap struct {
sync.Mutex
conns map[primitive.ObjectID]*connection
}
func (cm *connectionMap) add(accid accountID, alias string) {
cm.Lock()
defer cm.Unlock()
old := cm.conns[accid]
if old != nil {
old.cleanup()
}
cm.conns[accid] = &connection{
alias: alias,
onClose: make(map[string]func()),
}
}
func (cm *connectionMap) remove(accid accountID) {
cm.Lock()
defer cm.Unlock()
old := cm.conns[accid]
if old != nil {
delete(cm.conns, accid)
old.cleanup()
}
}
func (cm *connectionMap) get(accid accountID) *connection {
cm.Lock()
defer cm.Unlock()
return cm.conns[accid]
}
type Tavern struct {
subTaverns []*subTavern
wsh *wshandler.WebsocketHandler
}
type subTavern struct {
mongoClient common.MongoClient
mongoClient gocommon.MongoClient
wsh *wshandler.WebsocketHandler
region string
groups map[string]group
methods map[string]reflect.Method
wshRpc rpcCallDomain[richConnOuter]
cm connectionMap
}
func getMacAddr() (string, error) {
@ -145,7 +152,7 @@ func getMacAddr() (string, error) {
func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) {
if inconfig == nil {
var loaded TavernConfig
if err := common.LoadConfig(&loaded); err != nil {
if err := gocommon.LoadConfig(&loaded); err != nil {
return nil, err
}
inconfig = &loaded
@ -157,7 +164,7 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err
}
config.macAddr = macaddr
tv := Tavern{
tv := &Tavern{
wsh: wsh,
}
@ -166,39 +173,38 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err
}
return &tv, nil
return tv, nil
}
func (tv *Tavern) Destructor() {
tv.wsh.Destructor()
func (tv *Tavern) Cleanup() {
for _, st := range tv.subTaverns {
st.mongoClient.Close()
}
}
type groupPipelineDocument struct {
OperationType string `bson:"operationType"`
FullDocument map[string]any `bson:"fullDocument"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
UpdateDescription struct {
UpdatedFields bson.M `bson:"updatedFields"`
RemovedFileds bson.A `bson:"removedFields"`
TruncatedArrays bson.A `bson:"truncatedArrays"`
} `bson:"updateDescription"`
}
// func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
// switch messageType {
// case wshandler.Connected:
// case wshandler.Disconnected:
// }
// // gidtype := msg.Conn.GetTag("gid")
// // if len(gidtype) > 0 {
// // tokens := strings.SplitN(gidtype, "@", 2)
// // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
// // gtype := tokens[1]
// // group := sub.groups[gtype]
// // if group != nil {
// // group.PauseMember(gidobj, msg.Alias, msg.Conn)
// // }
// }
func (tv *Tavern) prepare(ctx context.Context) error {
for region, url := range config.RegionStorage {
var dbconn common.MongoClient
for region := range config.RegionStorage {
var dbconn gocommon.MongoClient
var err error
var groupinstance group
if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil {
return err
}
var tmp *subTavern
methods := make(map[string]reflect.Method)
tp := reflect.TypeOf(tmp)
@ -212,25 +218,25 @@ func (tv *Tavern) prepare(ctx context.Context) error {
mongoClient: dbconn,
region: region,
methods: methods,
cm: connectionMap{
conns: make(map[primitive.ObjectID]*connection),
},
}
sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter {
return &richConnOuter{wsh: sub.wsh, rc: rc}
})
groups := make(map[string]group)
for typename, cfg := range config.GroupTypes {
cfg.Name = typename
if cfg.Transient {
groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh)
} else {
if !dbconn.Connected() {
dbconn, err = common.NewMongoClient(ctx, url.Mongo, region)
if err != nil {
return err
}
}
groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
groupinstance, err = cfg.prepareInMemory(ctx, typename, sub)
//} else {
// TODO : db
// if !dbconn.Connected() {
// dbconn, err = gocommon.NewMongoClient(ctx, url.Mongo, region)
// if err != nil {
// return err
// }
// }
// groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
}
if err != nil {
return err
@ -246,25 +252,28 @@ func (tv *Tavern) prepare(ctx context.Context) error {
}
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
// request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern
// 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다.
// 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?)
for _, sub := range tv.subTaverns {
tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived)
var pattern string
if sub.region == "default" {
pattern = common.MakeHttpHandlerPattern(prefix, "api")
pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
} else {
pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api")
pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api")
}
serveMux.HandleFunc(pattern, sub.api)
deliveryChan := tv.wsh.DeliveryChannel(sub.region)
go sub.deliveryMessageHandler(deliveryChan)
}
return nil
}
func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected {
sub.cm.add(sender.Accid, sender.Alias)
} else if messageType == wshandler.Disconnected {
sub.cm.remove(sender.Accid)
}
}
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
defer func() {
s := recover()

4
go.mod
View File

@ -4,15 +4,15 @@ go 1.19
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.5.0
go.mongodb.org/mongo-driver v1.11.7
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect

16
go.sum
View File

@ -102,5 +102,17 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee h1:Aau1j/b9wI4nyvrM7m1Q+2xkcW1Qo7i3q+QBD4Umnzg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617 h1:91mBIGIyxzcnvOaIdegUuV+i9xs8YTSRcmyRaIytzx8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97 h1:ARzXt3HBmiAUDyACfNm5Kvz1JMTn7+ryE03kB8x/km0=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94 h1:VrNj5gBFFN9/roWCxyBCZ2gu5k58eremNHQvQNPrfrU=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d h1:RdxKmMc7kHrTk+SvTYse2IGxmdDhbEDeM0fKAUW+G+w=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c h1:SktFqjnc/UOMjJrq/brSw5lQjW1IA+KkB5YgeovusmQ=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b h1:04rlgT+zeKSpekyleb8Mfi8kENIoka5DYJLuk65wqxc=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=

View File

@ -34,13 +34,14 @@ func main() {
panic(err)
} else {
serveMux := http.NewServeMux()
wsh.RegisterHandlers(ctx, serveMux, *prefix)
wsh.RegisterHandlers(serveMux, *prefix)
tv.RegisterHandlers(ctx, serveMux, *prefix)
server := common.NewHTTPServer(serveMux)
logger.Println("tavern is started")
wsh.Start(ctx)
server.Start()
cancel()
tv.Destructor()
wsh.Destructor()
tv.Cleanup()
wsh.Cleanup()
}
}