Compare commits
7 Commits
3c14e7e4c5
...
e42eef1f51
| Author | SHA1 | Date | |
|---|---|---|---|
| e42eef1f51 | |||
| da240bd5dd | |||
| e71b29ed1c | |||
| 4df30ea19c | |||
| 8c3b279850 | |||
| ec0ed1ce06 | |||
| 8d0f21077d |
467
core/apiimpl.go
467
core/apiimpl.go
@ -5,48 +5,15 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
common "repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func splitDocument(doc bson.M) bson.M {
|
||||
setdoc := bson.M{}
|
||||
unsetdoc := bson.M{}
|
||||
findoc := bson.M{}
|
||||
|
||||
for k, v := range doc {
|
||||
if k == "$set" {
|
||||
setdoc = v.(bson.M)
|
||||
} else if k == "$unset" {
|
||||
unsetdoc = v.(bson.M)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range doc {
|
||||
if v == nil {
|
||||
unsetdoc[k] = 1
|
||||
} else if k[0] != '$' {
|
||||
setdoc[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if len(setdoc) > 0 {
|
||||
findoc["$set"] = setdoc
|
||||
}
|
||||
if len(unsetdoc) > 0 {
|
||||
findoc["$unset"] = unsetdoc
|
||||
}
|
||||
|
||||
return findoc
|
||||
}
|
||||
|
||||
// CreateGroup : 그룹 생성
|
||||
// - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다.
|
||||
// - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다.
|
||||
@ -132,147 +99,6 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) EnterCandidateChannel(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
if _, ok := sub.groups[typename]; !ok {
|
||||
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// candidate channel은 big endian 최상위 비트가 1
|
||||
gidobj[0] |= 0x80
|
||||
|
||||
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
|
||||
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, midobj, typename)
|
||||
} else {
|
||||
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, midobj, typename)
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) LeaveCandidateChannel(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
if _, ok := sub.groups[typename]; !ok {
|
||||
logger.Println("EnterCandidateChannel failed. group type is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("EnterCandidateChannel failed. mid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("EnterCandidateChannel failed. gid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// candidate channel은 big endian 최상위 비트가 1
|
||||
gidobj[0] |= 0x80
|
||||
|
||||
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
|
||||
richConnOuter{wsh: sub.wsh, rc: conn}.LeaveTag(sub.region, gidobj, midobj, typename)
|
||||
} else {
|
||||
sub.wshRpc.caller.One(midobj).LeaveTag(sub.region, gidobj, midobj, typename)
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) EnterGroupChannel(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("EnterGroupChannel failed. group type is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("EnterGroupChannel failed. mid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("EnterGroupChannel failed. gid is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
tid := group.FindTicketID(gidobj, midobj)
|
||||
if tid.IsZero() {
|
||||
logger.Println("EnterGroupChannel failed. tid is zero")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if conn := sub.wsh.Conn(sub.region, midobj); conn != nil {
|
||||
richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, tid, typename)
|
||||
} else {
|
||||
sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, tid, typename)
|
||||
}
|
||||
writeBsonDoc(w, primitive.M{"_id": tid})
|
||||
}
|
||||
|
||||
func (sub *subTavern) SetStateInGroup(w http.ResponseWriter, r *http.Request) {
|
||||
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("SetStateInGroup failed. tag is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("SetStateInGroup failed. mid form value is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
state, ok := common.ReadStringFormValue(r.Form, "state")
|
||||
if !ok {
|
||||
logger.Println("SetStateInGroup failed. state is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
typename, ok := common.ReadStringFormValue(r.Form, "type")
|
||||
if !ok {
|
||||
logger.Println("SetStateInGroup failed. type is missing :", r)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
var doc bson.M
|
||||
if err := readBsonDoc(r.Body, &doc); err != nil {
|
||||
logger.Error("SetStateInGroup failed. readBsonDoc err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
tid := doc["_id"].(primitive.ObjectID)
|
||||
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
|
||||
richConnOuter{wsh: sub.wsh, rc: conn}.SetStateInTag(sub.region, gid, tid, state, typename)
|
||||
} else {
|
||||
sub.wshRpc.caller.One(mid).SetStateInTag(sub.region, gid, tid, state, typename)
|
||||
}
|
||||
}
|
||||
|
||||
// Invite : 초대
|
||||
// - type : 초대 타입 (required)
|
||||
// - from : 초대하는 자 (required)
|
||||
@ -320,52 +146,6 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte(result))
|
||||
}
|
||||
|
||||
func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("UpdateGroupMember failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("UpdateGroupMember failed. gid is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
|
||||
if !midok && !tidok {
|
||||
// 둘다 없네?
|
||||
logger.Println("JoinGroup failed. tid or mid should be exist")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
delete, _ := common.ReadBoolFormValue(r.Form, "delete")
|
||||
if delete {
|
||||
err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil)
|
||||
} else {
|
||||
var doc bson.M
|
||||
if err := readBsonDoc(r.Body, &doc); err != nil {
|
||||
logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Println("UpdateGroupMember failed. Update returns err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
@ -490,119 +270,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) TurnGroupOnline(w http.ResponseWriter, r *http.Request) {
|
||||
// group을 online 상태로 만든다.
|
||||
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
|
||||
// online인 group을 가지고 뭘 할지는 게임이 알아서...
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("TurnGroupOnline failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
|
||||
if !ok {
|
||||
logger.Println("TurnGroupOnline failed. group id '_id' form value is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("TurnGroupOnline failed. mid form value is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var filter bson.M
|
||||
if err := readBsonDoc(r.Body, &filter); err != nil {
|
||||
logger.Error("TurnGroupOnline failed. readBsonDoc return err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
exist, err := group.Exist(gid, filter)
|
||||
if err != nil {
|
||||
logger.Error("TurnGroupOnline failed. FindOne return err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if !exist {
|
||||
logger.Println("TurnGroupOnline failed. filter not match", filter)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
score, ok := common.ReadFloatFormValue(r.Form, "score")
|
||||
if !ok {
|
||||
score = 100
|
||||
}
|
||||
|
||||
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
|
||||
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
|
||||
} else {
|
||||
err = sub.wshRpc.caller.One(mid).TurnGroupOnline(onlineGroupQueryKey(typename), gid, score)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error("TurnGroupOnline failed. TurnGroupOnline err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) TurnGroupOffline(w http.ResponseWriter, r *http.Request) {
|
||||
// group을 offline 상태로 만든다.
|
||||
// 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다.
|
||||
// online인 group을 가지고 뭘 할지는 게임이 알아서...
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("TurnGroupOffline failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gid, ok := common.ReadObjectIDFormValue(r.Form, "_id")
|
||||
if !ok {
|
||||
logger.Println("TurnGroupOffline failed. group id '_id' form value is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("TurnGroupOffline failed. mid form value is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// onlinename := onlineGroupQueryKey(typename)
|
||||
// if onClose := conn.UnregistOnCloseFunc(onlinename); onClose != nil {
|
||||
// onClose()
|
||||
// } else {
|
||||
// gid, ok := common.ReadStringFormValue(form, "_id")
|
||||
// if ok {
|
||||
// sub.redisSync.ZRem(context.Background(), onlinename, gid)
|
||||
// }
|
||||
// }
|
||||
|
||||
var err error
|
||||
if conn := sub.wsh.Conn(sub.region, mid); conn != nil {
|
||||
err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOffline(onlineGroupQueryKey(typename), gid)
|
||||
} else {
|
||||
err = sub.wshRpc.caller.One(mid).TurnGroupOffline(onlineGroupQueryKey(typename), gid)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Error("TurnGroupOffline failed. TurnGroupOnline err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
@ -690,11 +357,7 @@ func (sub *subTavern) QueryOnlineState(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
state, err := sub.wsh.GetState(mid)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
state := sub.wsh.GetState(sub.region, mid)
|
||||
w.Write([]byte(state))
|
||||
}
|
||||
|
||||
@ -706,13 +369,7 @@ func (sub *subTavern) IsOnline(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
ok, err := sub.wsh.IsOnline(mid)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if ok {
|
||||
if state := sub.wsh.GetState(sub.region, mid); len(state) > 0 {
|
||||
w.Write([]byte("true"))
|
||||
} else {
|
||||
w.Write([]byte("false"))
|
||||
@ -983,6 +640,32 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("DismissGroup failed. type is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("UpdateMemberDocument failed. member_id is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("UpdateMemberDocument failed. _id is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
group.PauseMember(gidobj, midobj)
|
||||
}
|
||||
|
||||
func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
@ -1013,55 +696,55 @@ func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r != nil {
|
||||
logger.Error(r)
|
||||
}
|
||||
}()
|
||||
// func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
|
||||
// defer func() {
|
||||
// r := recover()
|
||||
// if r != nil {
|
||||
// logger.Error(r)
|
||||
// }
|
||||
// }()
|
||||
|
||||
redisSync := sub.wsh.RedisSync
|
||||
for msg := range deliveryChan {
|
||||
mid := msg.Alias
|
||||
if msg.Body != nil {
|
||||
buffer := msg.Body
|
||||
// redisSync := sub.wsh.RedisSync
|
||||
// for msg := range deliveryChan {
|
||||
// mid := msg.Alias
|
||||
// if msg.Body != nil {
|
||||
// buffer := msg.Body
|
||||
|
||||
var channame string
|
||||
for i, ch := range buffer {
|
||||
if ch == 0 {
|
||||
channame = string(buffer[:i])
|
||||
buffer = buffer[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
// var channame string
|
||||
// for i, ch := range buffer {
|
||||
// if ch == 0 {
|
||||
// channame = string(buffer[:i])
|
||||
// buffer = buffer[i+1:]
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
|
||||
if len(channame) == 0 {
|
||||
continue
|
||||
}
|
||||
// if len(channame) == 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
buffer = append(mid[:], buffer...)
|
||||
_, err := redisSync.Publish(context.Background(), channame, buffer).Result()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
}
|
||||
// buffer = append(mid[:], buffer...)
|
||||
// _, err := redisSync.Publish(context.Background(), channame, buffer).Result()
|
||||
// if err != nil {
|
||||
// logger.Error(err)
|
||||
// }
|
||||
// }
|
||||
|
||||
if len(msg.Command) > 0 {
|
||||
switch msg.Command {
|
||||
case "pause":
|
||||
gidtype := msg.Conn.GetTag("gid")
|
||||
if len(gidtype) > 0 {
|
||||
tokens := strings.SplitN(gidtype, "@", 2)
|
||||
gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
gtype := tokens[1]
|
||||
group := sub.groups[gtype]
|
||||
if group != nil {
|
||||
group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.Println("delivery chan fin")
|
||||
}
|
||||
// if len(msg.Command) > 0 {
|
||||
// switch msg.Command {
|
||||
// case "pause":
|
||||
// gidtype := msg.Conn.GetTag("gid")
|
||||
// if len(gidtype) > 0 {
|
||||
// tokens := strings.SplitN(gidtype, "@", 2)
|
||||
// gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
// gtype := tokens[1]
|
||||
// group := sub.groups[gtype]
|
||||
// if group != nil {
|
||||
// group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// logger.Println("delivery chan fin")
|
||||
// }
|
||||
|
||||
@ -3,8 +3,6 @@ package core
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
@ -30,7 +28,6 @@ type group interface {
|
||||
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error)
|
||||
FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID
|
||||
Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error)
|
||||
UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error
|
||||
CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error)
|
||||
DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
@ -42,7 +39,7 @@ type group interface {
|
||||
QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error)
|
||||
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
|
||||
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error
|
||||
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
|
||||
UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
|
||||
Dismiss(groupID primitive.ObjectID) error
|
||||
UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error
|
||||
|
||||
@ -10,20 +10,16 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/rpc"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
"repositories.action2quare.com/ayo/tavern/core/rpc"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gorilla/websocket"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
@ -32,8 +28,6 @@ type accountID = primitive.ObjectID
|
||||
type ticketID = primitive.ObjectID
|
||||
type groupID = primitive.ObjectID
|
||||
|
||||
var everyHost, _ = primitive.ObjectIDFromHex("010203040506070809101112")
|
||||
|
||||
type Invitation struct {
|
||||
GroupID groupID `json:"gid"`
|
||||
TicketID ticketID `json:"tid"`
|
||||
@ -56,7 +50,6 @@ type PublicMemberDoc struct {
|
||||
|
||||
type FullGroupDoc struct {
|
||||
Gid groupID
|
||||
DM string
|
||||
AllMembers []*PublicMemberDoc `json:",omitempty"`
|
||||
Body GroupDocBody `json:",omitempty"`
|
||||
}
|
||||
@ -69,7 +62,7 @@ type memberDoc struct {
|
||||
|
||||
// underscore keys in Hidden
|
||||
Hidden bson.M
|
||||
rconn *wshandler.Richconn
|
||||
rconn *connection
|
||||
Mid accountID
|
||||
}
|
||||
|
||||
@ -99,28 +92,28 @@ func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) {
|
||||
return bson.Marshal(gd.Body)
|
||||
}
|
||||
|
||||
func (gd *groupDoc) updateBodyWithJson(src []byte) ([]byte, error) {
|
||||
func (gd *groupDoc) updateBodyWithJson(src []byte) GroupDocBody {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
err := json.Unmarshal(src, &gd.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
|
||||
return json.Marshal(makeTypeMessage(gd.Body))
|
||||
return gd.Body
|
||||
}
|
||||
|
||||
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) (jsonBt []byte, err error) {
|
||||
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) GroupDocBody {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
err = bson.Unmarshal(bsonSrc, &gd.Body)
|
||||
err := bson.Unmarshal(bsonSrc, &gd.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
|
||||
return json.Marshal(makeTypeMessage(gd.Body))
|
||||
return gd.Body
|
||||
}
|
||||
|
||||
func (gd *groupDoc) updateBody(bsonSrc []byte) error {
|
||||
@ -130,7 +123,7 @@ func (gd *groupDoc) updateBody(bsonSrc []byte) error {
|
||||
return bson.Unmarshal(bsonSrc, &gd.Body)
|
||||
}
|
||||
|
||||
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *wshandler.Richconn, ttl time.Duration, max int) (ticketID, *memberDoc) {
|
||||
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
@ -194,7 +187,7 @@ func seperateHidden(in bson.M) (public bson.M, hidden bson.M) {
|
||||
return in, hidden
|
||||
}
|
||||
|
||||
func (gd *groupDoc) addInCharge(mid accountID, rconn *wshandler.Richconn, doc bson.M) (ticketID, *memberDoc) {
|
||||
func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
@ -291,21 +284,6 @@ func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) {
|
||||
}
|
||||
}
|
||||
|
||||
func (gd *groupDoc) conns(includeInvitee bool) (out []*wshandler.Richconn) {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
for _, mem := range gd.tickets {
|
||||
if mem.rconn != nil {
|
||||
if !includeInvitee && mem.Invite {
|
||||
continue
|
||||
}
|
||||
out = append(out, mem.rconn)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (gd *groupDoc) ticket(mid accountID) ticketID {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
@ -399,7 +377,7 @@ func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) {
|
||||
}
|
||||
}
|
||||
|
||||
func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []byte {
|
||||
func (gd *groupDoc) serializeFull(gid groupID) FullGroupDoc {
|
||||
gd.Lock()
|
||||
defer gd.Unlock()
|
||||
|
||||
@ -416,14 +394,11 @@ func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []b
|
||||
})
|
||||
}
|
||||
|
||||
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{
|
||||
return FullGroupDoc{
|
||||
Gid: gid,
|
||||
DM: directMessageChanName,
|
||||
AllMembers: output,
|
||||
Body: gd.Body,
|
||||
}))
|
||||
|
||||
return bt
|
||||
}
|
||||
}
|
||||
|
||||
type groupContainer struct {
|
||||
@ -436,7 +411,11 @@ type groupInMemory struct {
|
||||
groupDocSync func(groupID, []byte) error
|
||||
memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error
|
||||
rpcCall func([]byte) error
|
||||
hasConn func(accountID) *wshandler.Richconn
|
||||
hasConn func(accountID) *connection
|
||||
sendUpstreamMessage func(*wshandler.UpstreamMessage)
|
||||
sendEnterRoomMessage func(groupID, accountID)
|
||||
sendLeaveRoomMessage func(groupID, accountID)
|
||||
sendCloseMessage func(accountID, string)
|
||||
groups groupContainer
|
||||
}
|
||||
|
||||
@ -496,46 +475,6 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error
|
||||
}
|
||||
|
||||
var errGroupNotExist = errors.New("group does not exist")
|
||||
var errFuncNameIsMissing = errors.New("how func name is missin")
|
||||
|
||||
func (gm *groupInMemory) callProxyRpc(target accountID, name string, args ...any) error {
|
||||
bt, err := rpc.Encode(target, name, args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return gm.rpcCall(bt)
|
||||
}
|
||||
|
||||
type rpcTarget struct {
|
||||
gm *groupInMemory
|
||||
target accountID
|
||||
}
|
||||
|
||||
func (rt rpcTarget) call(args ...any) error {
|
||||
pc := make([]uintptr, 1)
|
||||
n := runtime.Callers(2, pc[:])
|
||||
if n < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
frame, _ := runtime.CallersFrames(pc).Next()
|
||||
funcname := path.Ext(frame.Func.Name())
|
||||
if len(funcname) > 0 {
|
||||
funcname = funcname[1:]
|
||||
return rt.gm.callProxyRpc(rt.target, funcname, args...)
|
||||
}
|
||||
|
||||
return errFuncNameIsMissing
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) rpc(target accountID) rpcTarget {
|
||||
return rpcTarget{
|
||||
gm: gm,
|
||||
target: target,
|
||||
}
|
||||
}
|
||||
|
||||
var errNoEmptySlot = errors.New("no more seat in group")
|
||||
|
||||
func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) {
|
||||
@ -558,62 +497,40 @@ func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID {
|
||||
return primitive.NilObjectID
|
||||
}
|
||||
|
||||
func makeTypeMessage[T any](msg T) bson.M {
|
||||
var ptr *T
|
||||
name := reflect.TypeOf(ptr).Elem().Name()
|
||||
return bson.M{name: msg}
|
||||
}
|
||||
// func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
|
||||
// bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
// rconn.WriteBytes(bt)
|
||||
// }
|
||||
|
||||
func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
|
||||
bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
rconn.WriteBytes(bt)
|
||||
}
|
||||
// func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
|
||||
// bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
// gm.SendMessage(target, bt)
|
||||
// }
|
||||
|
||||
func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
|
||||
bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
gm.SendMessage(target, bt)
|
||||
}
|
||||
// func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
|
||||
// rconn := gm.hasConn(target)
|
||||
// if rconn != nil {
|
||||
// rconn.WriteBytes(msg)
|
||||
// } else {
|
||||
// gm.rpc(target).call(target, msg)
|
||||
// }
|
||||
// }
|
||||
|
||||
func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
|
||||
rconn := gm.hasConn(target)
|
||||
if rconn != nil {
|
||||
rconn.WriteBytes(msg)
|
||||
} else {
|
||||
gm.rpc(target).call(target, msg)
|
||||
}
|
||||
}
|
||||
// func multicast(conns []*wshandler.Richconn, raw []byte) {
|
||||
// for _, rconn := range conns {
|
||||
// rconn.WriteBytes(raw)
|
||||
// }
|
||||
// }
|
||||
|
||||
func multicast(conns []*wshandler.Richconn, raw []byte) {
|
||||
for _, rconn := range conns {
|
||||
rconn.WriteBytes(raw)
|
||||
}
|
||||
}
|
||||
func multicastTyped[T any](conns []*wshandler.Richconn, msg T) {
|
||||
bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
go multicast(conns, bt)
|
||||
}
|
||||
|
||||
func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
|
||||
if gd := gm.groups.find(gid); gd != nil {
|
||||
bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
go multicast(gd.conns(false), bt)
|
||||
}
|
||||
}
|
||||
// func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
|
||||
// if gd := gm.groups.find(gid); gd != nil {
|
||||
// bt, _ := json.Marshal(makeTypeMessage(msg))
|
||||
// go multicast(gd.conns(false), bt)
|
||||
// }
|
||||
// }
|
||||
|
||||
var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field")
|
||||
|
||||
func (gm *groupInMemory) SendInvitationFailed(mid accountID, inviteeDoc bson.M) error {
|
||||
delete(inviteeDoc, "_mid")
|
||||
rconn := gm.hasConn(mid)
|
||||
if rconn == nil {
|
||||
return gm.rpc(mid).call(mid, inviteeDoc)
|
||||
}
|
||||
|
||||
sendTypedMessage(gm, mid, InvitationFail(inviteeDoc))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error {
|
||||
targetid := inviteeDoc["_mid"].(accountID)
|
||||
|
||||
@ -621,7 +538,7 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
|
||||
// invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자
|
||||
rconn := gm.hasConn(targetid)
|
||||
if rconn == nil {
|
||||
return gm.rpc(targetid).call(gid, inviteeDoc, inviterDoc)
|
||||
return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
|
||||
}
|
||||
|
||||
gd := gm.groups.find(gid)
|
||||
@ -629,10 +546,15 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
|
||||
return errGroupNotExist
|
||||
}
|
||||
|
||||
if rconn.HasOnCloseFunc("member_remove_invite") {
|
||||
if rconn.hasOnCloseFunc("member_remove_invite") {
|
||||
// 이미 초대 중이다.
|
||||
// inviter한테 알려줘야 한다.
|
||||
return gm.SendInvitationFailed(mid, inviteeDoc)
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "@" + mid.Hex(),
|
||||
Body: inviteeDoc,
|
||||
Tag: []string{"InvitationFail"},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember)
|
||||
@ -640,17 +562,21 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
|
||||
return errNoEmptySlot
|
||||
}
|
||||
|
||||
rconn.RegistOnCloseFunc("member_remove_invite", func() {
|
||||
rconn.registOnCloseFunc("member_remove_invite", func() {
|
||||
gd.removeMember(targetid, &tid)
|
||||
gm.memberSync(gid, targetid, tid, nil, false)
|
||||
})
|
||||
|
||||
gm.memberSync(gid, targetid, tid, newdoc, false)
|
||||
sendTypedMessage(gm, targetid, Invitation{
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "@" + targetid.Hex(),
|
||||
Body: Invitation{
|
||||
GroupID: gid,
|
||||
TicketID: tid,
|
||||
Inviter: inviterDoc,
|
||||
ExpireAtUTC: newdoc.InviteExpire.Unix(),
|
||||
},
|
||||
Tag: []string{"Invitation"},
|
||||
})
|
||||
|
||||
return nil
|
||||
@ -685,7 +611,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
|
||||
if rconn == nil {
|
||||
// mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다.
|
||||
// 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다.
|
||||
return gid.Hex(), gm.rpc(mid).call(gid, mid, inviteeDoc, inviteeDoc)
|
||||
return gid.Hex(), rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
|
||||
}
|
||||
|
||||
// 이제 여기는 mid가 InCharge이면서 rconn이 존재
|
||||
@ -693,7 +619,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
|
||||
if gd == nil {
|
||||
_, gd = gm.groups.createWithID(gid, bson.M{})
|
||||
tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc)
|
||||
rconn.RegistOnCloseFunc("member_remove", func() {
|
||||
rconn.registOnCloseFunc("member_remove", func() {
|
||||
// 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다.
|
||||
gm.groupDocSync(gid, nil)
|
||||
})
|
||||
@ -703,6 +629,8 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
|
||||
}
|
||||
gm.groupDocSync(gid, bt)
|
||||
gm.memberSync(gid, mid, tid, newdoc, true)
|
||||
// 내가 wshandler room에 입장
|
||||
gm.sendEnterRoomMessage(gid, mid)
|
||||
} else {
|
||||
// targetid가 이미 멤버인지 미리 확인 가능
|
||||
if !gd.ticket(targetid).IsZero() {
|
||||
@ -714,9 +642,6 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
|
||||
return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc)
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) UpdateGroupMember(gid groupID, mid accountID, tid ticketID, doc bson.M) error {
|
||||
return nil
|
||||
}
|
||||
func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error {
|
||||
return nil
|
||||
}
|
||||
@ -729,20 +654,21 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticket
|
||||
|
||||
rconn := gm.hasConn(mid)
|
||||
if rconn == nil {
|
||||
return gid, gm.rpc(mid).call(gid, mid, tid, member)
|
||||
return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member)
|
||||
}
|
||||
|
||||
oldFunc := rconn.UnregistOnCloseFunc("member_remove")
|
||||
oldFunc := rconn.unregistOnCloseFunc("member_remove")
|
||||
if oldFunc != nil {
|
||||
// 기존 멤버였으면 탈퇴 처리
|
||||
oldFunc()
|
||||
}
|
||||
|
||||
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite")
|
||||
rconn.RegistOnCloseFunc("member_remove", inviteFunc)
|
||||
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
|
||||
rconn.registOnCloseFunc("member_remove", inviteFunc)
|
||||
|
||||
result, isNew := gd.addMember(mid, &tid, member)
|
||||
if result != nil {
|
||||
gm.sendEnterRoomMessage(gid, mid)
|
||||
return gid, gm.memberSync(gid, mid, tid, result, isNew)
|
||||
}
|
||||
|
||||
@ -758,10 +684,10 @@ func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID
|
||||
|
||||
rconn := gm.hasConn(mid)
|
||||
if rconn == nil {
|
||||
return gm.rpc(mid).call(gid, mid, tid)
|
||||
return rpc.Make(gm).To(mid).Call(gid, mid, tid)
|
||||
}
|
||||
|
||||
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite")
|
||||
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
|
||||
if inviteFunc != nil {
|
||||
inviteFunc() // removeMember는 여기에 들어있다.
|
||||
return nil
|
||||
@ -799,21 +725,27 @@ func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.
|
||||
// 드랍해야 한다.
|
||||
if gd.InCharge == mid {
|
||||
// 내가 방장인 경우
|
||||
gm.groupDocSync(gid, nil)
|
||||
return gm.groupDocSync(gid, nil)
|
||||
} else {
|
||||
// 내가 방장이 아닌 경우
|
||||
gd.removeMember(mid, &tid)
|
||||
gm.memberSync(gid, mid, tid, nil, false)
|
||||
return gm.memberSync(gid, mid, tid, nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, rconn *wshandler.Richconn) error {
|
||||
rconn.UnregistOnCloseFunc("member_remove")
|
||||
rconn.UnregistOnCloseFunc("member_remove_invite")
|
||||
rconn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "pause"), time.Time{})
|
||||
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
|
||||
rconn := gm.hasConn(mid)
|
||||
if rconn == nil {
|
||||
return rpc.Make(gm).To(mid).Call(gid, mid)
|
||||
}
|
||||
|
||||
// 접속은 끊기지만 그룹에서 제거하지는 않는 상태
|
||||
rconn.unregistOnCloseFunc("member_remove")
|
||||
rconn.unregistOnCloseFunc("member_remove_invite")
|
||||
gm.sendCloseMessage(mid, "pause")
|
||||
|
||||
gd := gm.groups.find(gid)
|
||||
if gd == nil {
|
||||
@ -907,24 +839,43 @@ func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error {
|
||||
// targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자
|
||||
rconn := gm.hasConn(targetmid)
|
||||
if rconn == nil {
|
||||
return gm.rpc(targetmid).call(gid, mid, tid)
|
||||
return rpc.Make(gm).To(targetmid).Call(gid, mid, tid)
|
||||
}
|
||||
|
||||
if oldfunc := rconn.UnregistOnCloseFunc("member_remove"); oldfunc != nil {
|
||||
if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil {
|
||||
oldfunc() // 이 안에 다 있다.
|
||||
}
|
||||
|
||||
// 나한테는 빈 FullGroupDoc을 보낸다.
|
||||
sendTypedMessageDirect(rconn, FullGroupDoc{
|
||||
Gid: gid,
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "@" + mid.Hex(),
|
||||
Body: FullGroupDoc{Gid: gid},
|
||||
Tag: []string{"FullGroupDoc", gid.Hex()},
|
||||
})
|
||||
gm.sendLeaveRoomMessage(gid, targetmid)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
|
||||
gd := gm.groups.find(gid)
|
||||
if gd == nil {
|
||||
return errGroupNotExist
|
||||
}
|
||||
|
||||
tid := gd.ticket(mid)
|
||||
bt, _ := json.Marshal(doc)
|
||||
|
||||
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "#" + gid.Hex(),
|
||||
Body: gd.updateBodyWithJson(personalized),
|
||||
Tag: []string{"GroupDocBody"},
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) Dismiss(gid groupID) error {
|
||||
return nil
|
||||
}
|
||||
@ -942,15 +893,21 @@ func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error {
|
||||
return gm.groupDocSync(gid, newbody)
|
||||
}
|
||||
|
||||
func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool {
|
||||
return gm.hasConn(target) != nil
|
||||
}
|
||||
|
||||
var devflag = flagx.Bool("dev", false, "")
|
||||
|
||||
func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, typename string, wsh *wshandler.WebsocketHandler) (group, error) {
|
||||
func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) {
|
||||
// group document
|
||||
// member document
|
||||
region := sub.region
|
||||
wsh := sub.wsh
|
||||
|
||||
groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename)
|
||||
memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename)
|
||||
rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename)
|
||||
clientMessageChanName := fmt.Sprintf("c_mgc_%s_%s", region, typename)
|
||||
|
||||
toHashHex := func(name string) string {
|
||||
hash := md5.New()
|
||||
@ -966,7 +923,6 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
groupDocSyncChanName = toHashHex(groupDocSyncChanName)
|
||||
memberSyncChanName = toHashHex(memberSyncChanName)
|
||||
rpcChanName = toHashHex(rpcChanName)
|
||||
clientMessageChanName = toHashHex(clientMessageChanName)
|
||||
|
||||
// 여기서는 subscribe channel
|
||||
// 각 함수에서는 publish
|
||||
@ -1000,15 +956,28 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
_, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result()
|
||||
return err
|
||||
},
|
||||
hasConn: func(t accountID) *wshandler.Richconn {
|
||||
return wsh.Conn(region, t)
|
||||
hasConn: func(t accountID) *connection {
|
||||
return sub.cm.get(t)
|
||||
},
|
||||
groups: groupContainer{
|
||||
groupDocs: make(map[groupID]*groupDoc),
|
||||
},
|
||||
sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) {
|
||||
wsh.SendUpstreamMessage(region, msg)
|
||||
},
|
||||
sendEnterRoomMessage: func(gid groupID, accid accountID) {
|
||||
wsh.EnterRoom(region, gid.Hex(), accid)
|
||||
},
|
||||
sendLeaveRoomMessage: func(gid groupID, accid accountID) {
|
||||
wsh.LeaveRoom(region, gid.Hex(), accid)
|
||||
},
|
||||
sendCloseMessage: func(target accountID, text string) {
|
||||
wsh.SendCloseMessage(region, target.Hex(), text)
|
||||
},
|
||||
}
|
||||
|
||||
// TODO : processChannelMessage 스레드 분리해보자
|
||||
rpc.RegistReceiver(gm)
|
||||
|
||||
processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub {
|
||||
defer func() {
|
||||
r := recover()
|
||||
@ -1024,34 +993,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
}
|
||||
|
||||
switch msg.Channel {
|
||||
case clientMessageChanName:
|
||||
bt := []byte(msg.Payload)
|
||||
if len(bt) < 24 {
|
||||
break
|
||||
}
|
||||
|
||||
// 주!! mid가 먼저
|
||||
var mid groupID
|
||||
copy(mid[:], bt[:12])
|
||||
bt = bt[12:]
|
||||
|
||||
var gid groupID
|
||||
copy(gid[:], bt[:12])
|
||||
bt = bt[12:]
|
||||
|
||||
gd := gm.groups.find(gid)
|
||||
if gd == nil {
|
||||
break
|
||||
}
|
||||
tid, _ := gd.memberByAccount(mid)
|
||||
if !tid.IsZero() {
|
||||
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
|
||||
if after, err := gd.updateBodyWithJson(personalized); err == nil {
|
||||
go multicast(gd.conns(false), after)
|
||||
}
|
||||
}
|
||||
|
||||
case groupDocSyncChanName:
|
||||
case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널
|
||||
payload := []byte(msg.Payload)
|
||||
if len(payload) < len(config.macAddr) {
|
||||
break
|
||||
@ -1069,17 +1011,18 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
if len(remain) == 0 {
|
||||
// gid 그룹 삭제
|
||||
// 그룹 안에 있는 멤버에게 알림
|
||||
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{
|
||||
Gid: gid,
|
||||
}))
|
||||
go multicast(gd.conns(true), bt)
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "#" + gid.Hex(),
|
||||
Body: FullGroupDoc{Gid: gid},
|
||||
Tag: []string{"FullGroupDoc"},
|
||||
})
|
||||
gm.groups.delete(gid)
|
||||
} else if string(senderHost) != config.macAddr {
|
||||
if r, err := gd.updateBodyBsonToJson(remain); err != nil {
|
||||
logger.Error("groupDocSyncChanName message decode failed :", remain, err)
|
||||
} else {
|
||||
go multicast(gd.conns(true), r)
|
||||
}
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "#" + gid.Hex(),
|
||||
Body: gd.updateBodyBsonToJson(remain),
|
||||
Tag: []string{"GroupDocBody"},
|
||||
})
|
||||
}
|
||||
} else if string(senderHost) != config.macAddr {
|
||||
var newDoc groupDoc
|
||||
@ -1090,7 +1033,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
}
|
||||
}
|
||||
|
||||
case memberSyncChanName:
|
||||
case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널
|
||||
if len(msg.Payload) < len(config.macAddr) {
|
||||
break
|
||||
}
|
||||
@ -1120,7 +1063,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
}
|
||||
|
||||
var updated *memberDoc
|
||||
rconn := wsh.Conn(region, mid)
|
||||
rconn := gm.hasConn(mid)
|
||||
|
||||
if senderHost != config.macAddr {
|
||||
// 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅
|
||||
@ -1140,56 +1083,40 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
if rconn != nil {
|
||||
// gid에 이미 다른 값이 있을 수 있다.
|
||||
// 정확하게 이 값이면 제거하고, 아니면 넘어간다.
|
||||
rconn.RemoveTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
|
||||
rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
|
||||
}
|
||||
broadcastTypedMessage(gm, gid, PublicMemberDoc{Tid: tid})
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "#" + gid.Hex(),
|
||||
Body: PublicMemberDoc{Tid: tid},
|
||||
Tag: []string{"PublicMemberDoc"},
|
||||
})
|
||||
} else {
|
||||
if isNewMember && updated.rconn == nil && rconn != nil {
|
||||
updated.rconn = rconn
|
||||
}
|
||||
// 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외
|
||||
broadcastTypedMessage(gm, gid, PublicMemberDoc{
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "#" + gid.Hex(),
|
||||
Body: PublicMemberDoc{
|
||||
Tid: tid,
|
||||
memberDocCommon: updated.memberDocCommon,
|
||||
},
|
||||
Tag: []string{"PublicMemberDoc"},
|
||||
})
|
||||
}
|
||||
|
||||
if isNewMember {
|
||||
if rconn != nil {
|
||||
// 새 멤버이므로 기존 멤버를 다 보내준다.
|
||||
rconn.AddTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
|
||||
rconn.WriteBytes(gd.serializeFull(gid, clientMessageChanName))
|
||||
rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
|
||||
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: "@" + mid.Hex(),
|
||||
Body: gd.serializeFull(gid),
|
||||
Tag: []string{"FullGroupDoc"},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
case rpcChanName:
|
||||
targetbt, fn, params, err := rpc.Decode[accountID]([]byte(msg.Payload))
|
||||
if err != nil {
|
||||
logger.Error("rpcChanName message decode failed :", msg.Payload, err)
|
||||
break
|
||||
}
|
||||
|
||||
call := func() {
|
||||
method, ok := reflect.TypeOf(gm).MethodByName(fn)
|
||||
if !ok {
|
||||
logger.Printf("%s message decode failed :", targetbt, msg.Payload, err)
|
||||
}
|
||||
|
||||
args := []reflect.Value{
|
||||
reflect.ValueOf(gm),
|
||||
}
|
||||
for _, arg := range params {
|
||||
args = append(args, reflect.ValueOf(arg))
|
||||
}
|
||||
|
||||
method.Func.Call(args)
|
||||
}
|
||||
|
||||
if *targetbt == everyHost {
|
||||
call()
|
||||
} else if rconn := wsh.Conn(region, *targetbt); rconn != nil {
|
||||
call()
|
||||
}
|
||||
default:
|
||||
logger.Println("unknown channel")
|
||||
}
|
||||
@ -1208,7 +1135,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
|
||||
var pubsub *redis.PubSub
|
||||
for {
|
||||
if pubsub == nil {
|
||||
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName, clientMessageChanName)
|
||||
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName)
|
||||
}
|
||||
|
||||
if pubsub == nil {
|
||||
|
||||
1050
core/group_mongo.go
1050
core/group_mongo.go
File diff suppressed because it is too large
Load Diff
130
core/richconn.go
130
core/richconn.go
@ -1,73 +1,97 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type richConnOuter struct {
|
||||
wsh *wshandler.WebsocketHandler
|
||||
rc *wshandler.Richconn
|
||||
type connection struct {
|
||||
locker sync.Mutex
|
||||
alias string
|
||||
tags []string
|
||||
onClose map[string]func()
|
||||
}
|
||||
|
||||
func (sub richConnOuter) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
|
||||
sub.wsh.JoinTag(region, tag, tid, sub.rc, hint)
|
||||
func (rc *connection) addTag(name, val string) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
wsh := sub.wsh
|
||||
sub.rc.RegistOnCloseFunc(tag.Hex(), func() {
|
||||
wsh.LeaveTag(region, tag, tid)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sub richConnOuter) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
|
||||
sub.SetStateInTag(region, tag, tid, "", hint)
|
||||
return sub.wsh.LeaveTag(region, tag, tid)
|
||||
}
|
||||
|
||||
func (sub richConnOuter) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
|
||||
return sub.wsh.SetStateInTag(region, tag, tid, state, hint)
|
||||
}
|
||||
|
||||
func (sub richConnOuter) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
|
||||
gidhex := gid.Hex()
|
||||
_, err := sub.wsh.RedisSync.ZAdd(context.Background(), key, &redis.Z{Score: score, Member: gidhex}).Result()
|
||||
if err != nil {
|
||||
logger.Error("TurnGroupOnline failed. redis.ZAdd return err :", err)
|
||||
return err
|
||||
prefix := name + "="
|
||||
for i, tag := range rc.tags {
|
||||
if strings.HasPrefix(tag, prefix) {
|
||||
rc.tags[i] = prefix + val
|
||||
return
|
||||
}
|
||||
|
||||
sub.rc.RegistOnCloseFunc(key, func() {
|
||||
sub.wsh.RedisSync.ZRem(context.Background(), key, gidhex)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
rc.tags = append(rc.tags, prefix+val)
|
||||
}
|
||||
|
||||
func (sub richConnOuter) TurnGroupOffline(key string, gid primitive.ObjectID) error {
|
||||
f := sub.rc.UnregistOnCloseFunc(key)
|
||||
if f != nil {
|
||||
f()
|
||||
func (rc *connection) removeTag(name string, val string) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
whole := fmt.Sprintf("%s=%s", name, val)
|
||||
for i, tag := range rc.tags {
|
||||
if tag == whole {
|
||||
if i == 0 && len(rc.tags) == 1 {
|
||||
rc.tags = nil
|
||||
} else {
|
||||
sub.wsh.RedisSync.ZRem(context.Background(), key, gid.Hex())
|
||||
lastidx := len(rc.tags) - 1
|
||||
if i < lastidx {
|
||||
rc.tags[i] = rc.tags[lastidx]
|
||||
}
|
||||
rc.tags = rc.tags[:lastidx]
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sub richConnOuter) SendMessage(doc []byte) error {
|
||||
return sub.rc.WriteBytes(doc)
|
||||
func (rc *connection) registOnCloseFunc(name string, f func()) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
f()
|
||||
return
|
||||
}
|
||||
rc.onClose[name] = f
|
||||
}
|
||||
|
||||
func (sub richConnOuter) SendMessageToTag(region string, tag primitive.ObjectID, msg []byte) error {
|
||||
sub.wsh.BroadcastRaw(region, tag, msg)
|
||||
return nil
|
||||
func (rc *connection) hasOnCloseFunc(name string) bool {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := rc.onClose[name]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (sub richConnOuter) CloseOnPurpose() error {
|
||||
return sub.rc.Close()
|
||||
func (rc *connection) unregistOnCloseFunc(name string) (out func()) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
return
|
||||
}
|
||||
out = rc.onClose[name]
|
||||
delete(rc.onClose, name)
|
||||
return
|
||||
}
|
||||
|
||||
func (rc *connection) cleanup() {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
cp := rc.onClose
|
||||
rc.onClose = nil
|
||||
go func() {
|
||||
for _, f := range cp {
|
||||
f()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@ -1,195 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11})
|
||||
|
||||
func init() {
|
||||
gob.Register(bson.M{})
|
||||
gob.Register(primitive.ObjectID{})
|
||||
gob.Register(primitive.Timestamp{})
|
||||
}
|
||||
|
||||
type RpcCaller struct {
|
||||
publish func(bt []byte) error
|
||||
}
|
||||
|
||||
func NewRpcCaller(f func(bt []byte) error) RpcCaller {
|
||||
return RpcCaller{
|
||||
publish: f,
|
||||
}
|
||||
}
|
||||
|
||||
type rpcCallContext struct {
|
||||
alias primitive.ObjectID
|
||||
publish func(bt []byte) error
|
||||
}
|
||||
|
||||
func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext {
|
||||
return rpcCallContext{
|
||||
alias: alias,
|
||||
publish: c.publish,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RpcCaller) Everybody() rpcCallContext {
|
||||
return rpcCallContext{
|
||||
alias: Everybody,
|
||||
publish: c.publish,
|
||||
}
|
||||
}
|
||||
|
||||
func IsCallerCalleeMethodMatch[Callee any]() error {
|
||||
var caller rpcCallContext
|
||||
var callee Callee
|
||||
|
||||
callerType := reflect.TypeOf(caller)
|
||||
calleeType := reflect.TypeOf(callee)
|
||||
for i := 0; i < callerType.NumMethod(); i++ {
|
||||
callerMethod := callerType.Method(i)
|
||||
calleeMethod, ok := calleeType.MethodByName(callerMethod.Name)
|
||||
if !ok {
|
||||
return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name())
|
||||
}
|
||||
|
||||
if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() {
|
||||
return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name)
|
||||
}
|
||||
|
||||
if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() {
|
||||
return fmt.Errorf("method '%s' out num is not match", callerMethod.Name)
|
||||
}
|
||||
|
||||
for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ {
|
||||
if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) {
|
||||
return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type fnsig struct {
|
||||
FunctionName string `bson:"fn"`
|
||||
Args []any `bson:"args"`
|
||||
}
|
||||
|
||||
func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) {
|
||||
m := append([]any{
|
||||
prefix,
|
||||
fn,
|
||||
}, args...)
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
encoder := gob.NewEncoder(buff)
|
||||
err := encoder.Encode(m)
|
||||
if err != nil {
|
||||
logger.Error("rpcCallContext.send err :", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
|
||||
func Decode[T any](src []byte) (*T, string, []any, error) {
|
||||
var m []any
|
||||
decoder := gob.NewDecoder(bytes.NewReader(src))
|
||||
if err := decoder.Decode(&m); err != nil {
|
||||
logger.Error("RpcCallee.Call err :", err)
|
||||
return nil, "", nil, err
|
||||
}
|
||||
|
||||
prfix := m[0].(T)
|
||||
fn := m[1].(string)
|
||||
|
||||
return &prfix, fn, m[2:], nil
|
||||
}
|
||||
|
||||
func decode(src []byte) (string, []any, error) {
|
||||
var sig fnsig
|
||||
decoder := gob.NewDecoder(bytes.NewReader(src))
|
||||
if err := decoder.Decode(&sig); err != nil {
|
||||
logger.Error("RpcCallee.Call err :", err)
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return sig.FunctionName, sig.Args, nil
|
||||
}
|
||||
|
||||
func (c *rpcCallContext) send(fn string, args ...any) error {
|
||||
bt, err := Encode(c.alias, fn, args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.publish(bt)
|
||||
}
|
||||
|
||||
type RpcCallee[T any] struct {
|
||||
methods map[string]reflect.Method
|
||||
create func(*wshandler.Richconn) *T
|
||||
}
|
||||
|
||||
func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] {
|
||||
out := RpcCallee[T]{
|
||||
methods: make(map[string]reflect.Method),
|
||||
create: createReceiverFunc,
|
||||
}
|
||||
|
||||
var tmp *T
|
||||
|
||||
tp := reflect.TypeOf(tmp)
|
||||
for i := 0; i < tp.NumMethod(); i++ {
|
||||
method := tp.Method(i)
|
||||
out.methods[method.Name] = method
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
fn, params, err := decode(src)
|
||||
if err != nil {
|
||||
logger.Error("RpcCallee.Call err :", err)
|
||||
return err
|
||||
}
|
||||
|
||||
method, ok := r.methods[fn]
|
||||
if !ok {
|
||||
err := fmt.Errorf("method '%s' is missing", fn)
|
||||
logger.Error("RpcCallee.Call err :", err)
|
||||
return err
|
||||
}
|
||||
|
||||
receiver := r.create(rc)
|
||||
args := []reflect.Value{
|
||||
reflect.ValueOf(receiver),
|
||||
}
|
||||
for _, arg := range params {
|
||||
args = append(args, reflect.ValueOf(arg))
|
||||
}
|
||||
|
||||
rets := method.Func.Call(args)
|
||||
if len(rets) > 0 && rets[len(rets)-1].Interface() != nil {
|
||||
return rets[len(rets)-1].Interface().(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,33 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
|
||||
return c.send("JoinTag", region, tag, tid, hint)
|
||||
}
|
||||
|
||||
func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
|
||||
return c.send("LeaveTag", region, tag, tid, hint)
|
||||
}
|
||||
|
||||
func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
|
||||
return c.send("TurnGroupOnline", key, gid, score)
|
||||
}
|
||||
|
||||
func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error {
|
||||
return c.send("TurnGroupOffline", key, gid)
|
||||
}
|
||||
func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
|
||||
return c.send("SetStateInTag", region, tag, tid, state, hint)
|
||||
}
|
||||
func (c rpcCallContext) SendMessage(doc []byte) error {
|
||||
return c.send("SendMessage", doc)
|
||||
}
|
||||
func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error {
|
||||
return c.send("SendMessageToTag", region, gid, msg)
|
||||
}
|
||||
func (c rpcCallContext) CloseOnPurpose() error {
|
||||
return c.send("CloseOnPurpose")
|
||||
}
|
||||
169
core/tavern.go
169
core/tavern.go
@ -8,13 +8,12 @@ import (
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
common "repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
"repositories.action2quare.com/ayo/tavern/core/rpc"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/bsonrw"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
@ -71,58 +70,66 @@ func readBsonDoc(r io.Reader, src any) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type rpcCallDomain[T any] struct {
|
||||
rpcCallChanName string
|
||||
caller rpc.RpcCaller
|
||||
callee rpc.RpcCallee[T]
|
||||
methods map[string]reflect.Method
|
||||
}
|
||||
|
||||
func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] {
|
||||
var tmp *CalleeType
|
||||
methods := make(map[string]reflect.Method)
|
||||
tp := reflect.TypeOf(tmp)
|
||||
for i := 0; i < tp.NumMethod(); i++ {
|
||||
method := tp.Method(i)
|
||||
methods[method.Name] = method
|
||||
}
|
||||
|
||||
rpcChanName := "conn_rpc_channel_" + tp.Name()
|
||||
publishFunc := func(bt []byte) error {
|
||||
_, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
return rpcCallDomain[CalleeType]{
|
||||
rpcCallChanName: rpcChanName,
|
||||
caller: rpc.NewRpcCaller(publishFunc),
|
||||
callee: rpc.NewRpcCallee(creator),
|
||||
methods: methods,
|
||||
}
|
||||
}
|
||||
|
||||
type TavernConfig struct {
|
||||
common.RegionStorageConfig `json:",inline"`
|
||||
gocommon.RegionStorageConfig `json:",inline"`
|
||||
|
||||
GroupTypes map[string]*groupConfig `json:"tavern_group_types"`
|
||||
MaingateApiToken string `json:"maingate_api_token"`
|
||||
RedisURL string `json:"tavern_redis_url"`
|
||||
macAddr string
|
||||
}
|
||||
|
||||
var config TavernConfig
|
||||
|
||||
type connectionMap struct {
|
||||
sync.Mutex
|
||||
conns map[primitive.ObjectID]*connection
|
||||
}
|
||||
|
||||
func (cm *connectionMap) add(accid accountID, alias string) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
old.cleanup()
|
||||
}
|
||||
cm.conns[accid] = &connection{
|
||||
alias: alias,
|
||||
onClose: make(map[string]func()),
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) remove(accid accountID) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
delete(cm.conns, accid)
|
||||
old.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) get(accid accountID) *connection {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
return cm.conns[accid]
|
||||
}
|
||||
|
||||
type Tavern struct {
|
||||
subTaverns []*subTavern
|
||||
wsh *wshandler.WebsocketHandler
|
||||
}
|
||||
|
||||
type subTavern struct {
|
||||
mongoClient common.MongoClient
|
||||
mongoClient gocommon.MongoClient
|
||||
wsh *wshandler.WebsocketHandler
|
||||
region string
|
||||
groups map[string]group
|
||||
methods map[string]reflect.Method
|
||||
wshRpc rpcCallDomain[richConnOuter]
|
||||
cm connectionMap
|
||||
}
|
||||
|
||||
func getMacAddr() (string, error) {
|
||||
@ -145,7 +152,7 @@ func getMacAddr() (string, error) {
|
||||
func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) {
|
||||
if inconfig == nil {
|
||||
var loaded TavernConfig
|
||||
if err := common.LoadConfig(&loaded); err != nil {
|
||||
if err := gocommon.LoadConfig(&loaded); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
inconfig = &loaded
|
||||
@ -157,7 +164,7 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
|
||||
return nil, err
|
||||
}
|
||||
config.macAddr = macaddr
|
||||
tv := Tavern{
|
||||
tv := &Tavern{
|
||||
wsh: wsh,
|
||||
}
|
||||
|
||||
@ -166,39 +173,38 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &tv, nil
|
||||
return tv, nil
|
||||
}
|
||||
|
||||
func (tv *Tavern) Destructor() {
|
||||
tv.wsh.Destructor()
|
||||
func (tv *Tavern) Cleanup() {
|
||||
for _, st := range tv.subTaverns {
|
||||
st.mongoClient.Close()
|
||||
}
|
||||
}
|
||||
|
||||
type groupPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
FullDocument map[string]any `bson:"fullDocument"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
UpdateDescription struct {
|
||||
UpdatedFields bson.M `bson:"updatedFields"`
|
||||
RemovedFileds bson.A `bson:"removedFields"`
|
||||
TruncatedArrays bson.A `bson:"truncatedArrays"`
|
||||
} `bson:"updateDescription"`
|
||||
}
|
||||
// func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
// switch messageType {
|
||||
// case wshandler.Connected:
|
||||
|
||||
// case wshandler.Disconnected:
|
||||
// }
|
||||
// // gidtype := msg.Conn.GetTag("gid")
|
||||
// // if len(gidtype) > 0 {
|
||||
// // tokens := strings.SplitN(gidtype, "@", 2)
|
||||
// // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
// // gtype := tokens[1]
|
||||
// // group := sub.groups[gtype]
|
||||
// // if group != nil {
|
||||
// // group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
// // }
|
||||
// }
|
||||
|
||||
func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
for region, url := range config.RegionStorage {
|
||||
var dbconn common.MongoClient
|
||||
for region := range config.RegionStorage {
|
||||
var dbconn gocommon.MongoClient
|
||||
var err error
|
||||
var groupinstance group
|
||||
|
||||
if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var tmp *subTavern
|
||||
methods := make(map[string]reflect.Method)
|
||||
tp := reflect.TypeOf(tmp)
|
||||
@ -212,25 +218,25 @@ func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
mongoClient: dbconn,
|
||||
region: region,
|
||||
methods: methods,
|
||||
cm: connectionMap{
|
||||
conns: make(map[primitive.ObjectID]*connection),
|
||||
},
|
||||
}
|
||||
|
||||
sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter {
|
||||
return &richConnOuter{wsh: sub.wsh, rc: rc}
|
||||
})
|
||||
|
||||
groups := make(map[string]group)
|
||||
for typename, cfg := range config.GroupTypes {
|
||||
cfg.Name = typename
|
||||
if cfg.Transient {
|
||||
groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh)
|
||||
} else {
|
||||
if !dbconn.Connected() {
|
||||
dbconn, err = common.NewMongoClient(ctx, url.Mongo, region)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
|
||||
groupinstance, err = cfg.prepareInMemory(ctx, typename, sub)
|
||||
//} else {
|
||||
// TODO : db
|
||||
// if !dbconn.Connected() {
|
||||
// dbconn, err = gocommon.NewMongoClient(ctx, url.Mongo, region)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@ -246,25 +252,28 @@ func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
|
||||
// request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern
|
||||
// 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다.
|
||||
// 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?)
|
||||
for _, sub := range tv.subTaverns {
|
||||
tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived)
|
||||
var pattern string
|
||||
if sub.region == "default" {
|
||||
pattern = common.MakeHttpHandlerPattern(prefix, "api")
|
||||
pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
|
||||
} else {
|
||||
pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api")
|
||||
pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api")
|
||||
}
|
||||
serveMux.HandleFunc(pattern, sub.api)
|
||||
|
||||
deliveryChan := tv.wsh.DeliveryChannel(sub.region)
|
||||
go sub.deliveryMessageHandler(deliveryChan)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
if messageType == wshandler.Connected {
|
||||
sub.cm.add(sender.Accid, sender.Alias)
|
||||
} else if messageType == wshandler.Disconnected {
|
||||
sub.cm.remove(sender.Accid)
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
|
||||
4
go.mod
4
go.mod
@ -4,15 +4,15 @@ go 1.19
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
go.mongodb.org/mongo-driver v1.11.7
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/klauspost/compress v1.16.6 // indirect
|
||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||
github.com/pires/go-proxyproto v0.7.0 // indirect
|
||||
|
||||
16
go.sum
16
go.sum
@ -102,5 +102,17 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee h1:Aau1j/b9wI4nyvrM7m1Q+2xkcW1Qo7i3q+QBD4Umnzg=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617 h1:91mBIGIyxzcnvOaIdegUuV+i9xs8YTSRcmyRaIytzx8=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711003621-3bb985d0b617/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97 h1:ARzXt3HBmiAUDyACfNm5Kvz1JMTn7+ryE03kB8x/km0=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711005604-a42eb2888e97/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94 h1:VrNj5gBFFN9/roWCxyBCZ2gu5k58eremNHQvQNPrfrU=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711033135-d7b26608df94/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d h1:RdxKmMc7kHrTk+SvTYse2IGxmdDhbEDeM0fKAUW+G+w=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711035757-9fd0dd00cb7d/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c h1:SktFqjnc/UOMjJrq/brSw5lQjW1IA+KkB5YgeovusmQ=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711053010-4acb81a20d9c/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b h1:04rlgT+zeKSpekyleb8Mfi8kENIoka5DYJLuk65wqxc=
|
||||
repositories.action2quare.com/ayo/gocommon v0.0.0-20230711062613-74829b93ac1b/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
|
||||
|
||||
7
main.go
7
main.go
@ -34,13 +34,14 @@ func main() {
|
||||
panic(err)
|
||||
} else {
|
||||
serveMux := http.NewServeMux()
|
||||
wsh.RegisterHandlers(ctx, serveMux, *prefix)
|
||||
wsh.RegisterHandlers(serveMux, *prefix)
|
||||
tv.RegisterHandlers(ctx, serveMux, *prefix)
|
||||
server := common.NewHTTPServer(serveMux)
|
||||
logger.Println("tavern is started")
|
||||
wsh.Start(ctx)
|
||||
server.Start()
|
||||
cancel()
|
||||
tv.Destructor()
|
||||
wsh.Destructor()
|
||||
tv.Cleanup()
|
||||
wsh.Cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user