rpc 패키지 적용

This commit is contained in:
2023-07-10 15:39:56 +09:00
parent 8d0f21077d
commit ec0ed1ce06
10 changed files with 325 additions and 651 deletions

View File

@ -5,48 +5,15 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"net/http" "net/http"
"strings"
common "repositories.action2quare.com/ayo/gocommon" common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
func splitDocument(doc bson.M) bson.M {
setdoc := bson.M{}
unsetdoc := bson.M{}
findoc := bson.M{}
for k, v := range doc {
if k == "$set" {
setdoc = v.(bson.M)
} else if k == "$unset" {
unsetdoc = v.(bson.M)
}
}
for k, v := range doc {
if v == nil {
unsetdoc[k] = 1
} else if k[0] != '$' {
setdoc[k] = v
}
}
if len(setdoc) > 0 {
findoc["$set"] = setdoc
}
if len(unsetdoc) > 0 {
findoc["$unset"] = unsetdoc
}
return findoc
}
// CreateGroup : 그룹 생성 // CreateGroup : 그룹 생성
// - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다. // - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다.
// - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다. // - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다.
@ -179,52 +146,6 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(result)) w.Write([]byte(result))
} }
func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("UpdateGroupMember failed. group type is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("UpdateGroupMember failed. gid is missing :", r.Form)
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid")
tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
if !midok && !tidok {
// 둘다 없네?
logger.Println("JoinGroup failed. tid or mid should be exist")
w.WriteHeader(http.StatusBadRequest)
return
}
var err error
delete, _ := common.ReadBoolFormValue(r.Form, "delete")
if delete {
err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil)
} else {
var doc bson.M
if err := readBsonDoc(r.Body, &doc); err != nil {
logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc)
}
if err != nil {
logger.Println("UpdateGroupMember failed. Update returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) { func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type") typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename] group := sub.groups[typename]
@ -719,6 +640,32 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request
} }
} }
func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename]
if group == nil {
logger.Println("DismissGroup failed. type is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
if !ok {
logger.Println("UpdateMemberDocument failed. member_id is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
if !ok {
logger.Println("UpdateMemberDocument failed. _id is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
group.PauseMember(gidobj, midobj)
}
func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) { func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
typename, _ := common.ReadStringFormValue(r.Form, "type") typename, _ := common.ReadStringFormValue(r.Form, "type")
group := sub.groups[typename] group := sub.groups[typename]
@ -749,55 +696,55 @@ func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
} }
} }
func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) { // func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
defer func() { // defer func() {
r := recover() // r := recover()
if r != nil { // if r != nil {
logger.Error(r) // logger.Error(r)
} // }
}() // }()
redisSync := sub.wsh.RedisSync // redisSync := sub.wsh.RedisSync
for msg := range deliveryChan { // for msg := range deliveryChan {
mid := msg.Alias // mid := msg.Alias
if msg.Body != nil { // if msg.Body != nil {
buffer := msg.Body // buffer := msg.Body
var channame string // var channame string
for i, ch := range buffer { // for i, ch := range buffer {
if ch == 0 { // if ch == 0 {
channame = string(buffer[:i]) // channame = string(buffer[:i])
buffer = buffer[i+1:] // buffer = buffer[i+1:]
break // break
} // }
} // }
if len(channame) == 0 { // if len(channame) == 0 {
continue // continue
} // }
buffer = append(mid[:], buffer...) // buffer = append(mid[:], buffer...)
_, err := redisSync.Publish(context.Background(), channame, buffer).Result() // _, err := redisSync.Publish(context.Background(), channame, buffer).Result()
if err != nil { // if err != nil {
logger.Error(err) // logger.Error(err)
} // }
} // }
if len(msg.Command) > 0 { // if len(msg.Command) > 0 {
switch msg.Command { // switch msg.Command {
case "pause": // case "pause":
gidtype := msg.Conn.GetTag("gid") // gidtype := msg.Conn.GetTag("gid")
if len(gidtype) > 0 { // if len(gidtype) > 0 {
tokens := strings.SplitN(gidtype, "@", 2) // tokens := strings.SplitN(gidtype, "@", 2)
gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
gtype := tokens[1] // gtype := tokens[1]
group := sub.groups[gtype] // group := sub.groups[gtype]
if group != nil { // if group != nil {
group.PauseMember(gidobj, msg.Alias, msg.Conn) // group.PauseMember(gidobj, msg.Alias, msg.Conn)
} // }
} // }
} // }
} // }
} // }
logger.Println("delivery chan fin") // logger.Println("delivery chan fin")
} // }

View File

@ -3,8 +3,6 @@ package core
import ( import (
"net/url" "net/url"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
@ -30,7 +28,6 @@ type group interface {
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error) Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error)
FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID
Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error)
UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error
CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error)
DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error
@ -42,7 +39,7 @@ type group interface {
QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error)
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error
DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
Dismiss(groupID primitive.ObjectID) error Dismiss(groupID primitive.ObjectID) error
UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error

View File

@ -10,20 +10,17 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"os" "os"
"path"
"reflect" "reflect"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/rpc"
"repositories.action2quare.com/ayo/gocommon/wshandler" "repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core/rpc"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
@ -32,8 +29,6 @@ type accountID = primitive.ObjectID
type ticketID = primitive.ObjectID type ticketID = primitive.ObjectID
type groupID = primitive.ObjectID type groupID = primitive.ObjectID
var everyHost, _ = primitive.ObjectIDFromHex("010203040506070809101112")
type Invitation struct { type Invitation struct {
GroupID groupID `json:"gid"` GroupID groupID `json:"gid"`
TicketID ticketID `json:"tid"` TicketID ticketID `json:"tid"`
@ -56,7 +51,6 @@ type PublicMemberDoc struct {
type FullGroupDoc struct { type FullGroupDoc struct {
Gid groupID Gid groupID
DM string
AllMembers []*PublicMemberDoc `json:",omitempty"` AllMembers []*PublicMemberDoc `json:",omitempty"`
Body GroupDocBody `json:",omitempty"` Body GroupDocBody `json:",omitempty"`
} }
@ -69,7 +63,7 @@ type memberDoc struct {
// underscore keys in Hidden // underscore keys in Hidden
Hidden bson.M Hidden bson.M
rconn *wshandler.Richconn rconn *connection
Mid accountID Mid accountID
} }
@ -99,28 +93,28 @@ func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) {
return bson.Marshal(gd.Body) return bson.Marshal(gd.Body)
} }
func (gd *groupDoc) updateBodyWithJson(src []byte) ([]byte, error) { func (gd *groupDoc) updateBodyWithJson(src []byte) []byte {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
err := json.Unmarshal(src, &gd.Body) err := json.Unmarshal(src, &gd.Body)
if err != nil { if err != nil {
return nil, err return nil
} }
return json.Marshal(makeTypeMessage(gd.Body)) return makeTypeMessage(gd.Body)
} }
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) (jsonBt []byte, err error) { func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) []byte {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
err = bson.Unmarshal(bsonSrc, &gd.Body) err := bson.Unmarshal(bsonSrc, &gd.Body)
if err != nil { if err != nil {
return nil, err return nil
} }
return json.Marshal(makeTypeMessage(gd.Body)) return makeTypeMessage(gd.Body)
} }
func (gd *groupDoc) updateBody(bsonSrc []byte) error { func (gd *groupDoc) updateBody(bsonSrc []byte) error {
@ -130,7 +124,7 @@ func (gd *groupDoc) updateBody(bsonSrc []byte) error {
return bson.Unmarshal(bsonSrc, &gd.Body) return bson.Unmarshal(bsonSrc, &gd.Body)
} }
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *wshandler.Richconn, ttl time.Duration, max int) (ticketID, *memberDoc) { func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
@ -194,7 +188,7 @@ func seperateHidden(in bson.M) (public bson.M, hidden bson.M) {
return in, hidden return in, hidden
} }
func (gd *groupDoc) addInCharge(mid accountID, rconn *wshandler.Richconn, doc bson.M) (ticketID, *memberDoc) { func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
@ -291,21 +285,6 @@ func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) {
} }
} }
func (gd *groupDoc) conns(includeInvitee bool) (out []*wshandler.Richconn) {
gd.Lock()
defer gd.Unlock()
for _, mem := range gd.tickets {
if mem.rconn != nil {
if !includeInvitee && mem.Invite {
continue
}
out = append(out, mem.rconn)
}
}
return
}
func (gd *groupDoc) ticket(mid accountID) ticketID { func (gd *groupDoc) ticket(mid accountID) ticketID {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
@ -399,7 +378,7 @@ func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) {
} }
} }
func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []byte { func (gd *groupDoc) serializeFull(gid groupID) []byte {
gd.Lock() gd.Lock()
defer gd.Unlock() defer gd.Unlock()
@ -416,14 +395,11 @@ func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []b
}) })
} }
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ return makeTypeMessage(FullGroupDoc{
Gid: gid, Gid: gid,
DM: directMessageChanName,
AllMembers: output, AllMembers: output,
Body: gd.Body, Body: gd.Body,
})) })
return bt
} }
type groupContainer struct { type groupContainer struct {
@ -433,11 +409,13 @@ type groupContainer struct {
type groupInMemory struct { type groupInMemory struct {
*groupConfig *groupConfig
groupDocSync func(groupID, []byte) error groupDocSync func(groupID, []byte) error
memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error
rpcCall func([]byte) error rpcCall func([]byte) error
hasConn func(accountID) *wshandler.Richconn hasConn func(accountID) *connection
groups groupContainer sendUpstreamMessage func(*wshandler.UpstreamMessage)
sendCloseMessage func(accountID, string)
groups groupContainer
} }
func (gc *groupContainer) add(id groupID, doc *groupDoc) { func (gc *groupContainer) add(id groupID, doc *groupDoc) {
@ -496,46 +474,6 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error
} }
var errGroupNotExist = errors.New("group does not exist") var errGroupNotExist = errors.New("group does not exist")
var errFuncNameIsMissing = errors.New("how func name is missin")
func (gm *groupInMemory) callProxyRpc(target accountID, name string, args ...any) error {
bt, err := rpc.Encode(target, name, args...)
if err != nil {
return err
}
return gm.rpcCall(bt)
}
type rpcTarget struct {
gm *groupInMemory
target accountID
}
func (rt rpcTarget) call(args ...any) error {
pc := make([]uintptr, 1)
n := runtime.Callers(2, pc[:])
if n < 1 {
return nil
}
frame, _ := runtime.CallersFrames(pc).Next()
funcname := path.Ext(frame.Func.Name())
if len(funcname) > 0 {
funcname = funcname[1:]
return rt.gm.callProxyRpc(rt.target, funcname, args...)
}
return errFuncNameIsMissing
}
func (gm *groupInMemory) rpc(target accountID) rpcTarget {
return rpcTarget{
gm: gm,
target: target,
}
}
var errNoEmptySlot = errors.New("no more seat in group") var errNoEmptySlot = errors.New("no more seat in group")
func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) { func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) {
@ -558,58 +496,47 @@ func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID {
return primitive.NilObjectID return primitive.NilObjectID
} }
func makeTypeMessage[T any](msg T) bson.M { func makeTypeMessage[T any](msg T) []byte {
var ptr *T var ptr *T
name := reflect.TypeOf(ptr).Elem().Name() name := reflect.TypeOf(ptr).Elem().Name()
return bson.M{name: msg} bt, _ := json.Marshal(bson.M{name: msg})
return bt
} }
func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { // func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
bt, _ := json.Marshal(makeTypeMessage(msg)) // bt, _ := json.Marshal(makeTypeMessage(msg))
rconn.WriteBytes(bt) // rconn.WriteBytes(bt)
} // }
func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { // func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
bt, _ := json.Marshal(makeTypeMessage(msg)) // bt, _ := json.Marshal(makeTypeMessage(msg))
gm.SendMessage(target, bt) // gm.SendMessage(target, bt)
} // }
func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { // func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
rconn := gm.hasConn(target) // rconn := gm.hasConn(target)
if rconn != nil { // if rconn != nil {
rconn.WriteBytes(msg) // rconn.WriteBytes(msg)
} else { // } else {
gm.rpc(target).call(target, msg) // gm.rpc(target).call(target, msg)
} // }
} // }
func multicast(conns []*wshandler.Richconn, raw []byte) { // func multicast(conns []*wshandler.Richconn, raw []byte) {
for _, rconn := range conns { // for _, rconn := range conns {
rconn.WriteBytes(raw) // rconn.WriteBytes(raw)
} // }
} // }
func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { // func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
if gd := gm.groups.find(gid); gd != nil { // if gd := gm.groups.find(gid); gd != nil {
bt, _ := json.Marshal(makeTypeMessage(msg)) // bt, _ := json.Marshal(makeTypeMessage(msg))
go multicast(gd.conns(false), bt) // go multicast(gd.conns(false), bt)
} // }
} // }
var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field")
func (gm *groupInMemory) SendInvitationFailed(mid accountID, inviteeDoc bson.M) error {
delete(inviteeDoc, "_mid")
rconn := gm.hasConn(mid)
if rconn == nil {
return gm.rpc(mid).call(mid, inviteeDoc)
}
sendTypedMessage(gm, mid, InvitationFail(inviteeDoc))
return nil
}
func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error { func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error {
targetid := inviteeDoc["_mid"].(accountID) targetid := inviteeDoc["_mid"].(accountID)
@ -617,7 +544,7 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
// invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자 // invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자
rconn := gm.hasConn(targetid) rconn := gm.hasConn(targetid)
if rconn == nil { if rconn == nil {
return gm.rpc(targetid).call(gid, inviteeDoc, inviterDoc) return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
} }
gd := gm.groups.find(gid) gd := gm.groups.find(gid)
@ -625,10 +552,14 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
return errGroupNotExist return errGroupNotExist
} }
if rconn.HasOnCloseFunc("member_remove_invite") { if rconn.hasOnCloseFunc("member_remove_invite") {
// 이미 초대 중이다. // 이미 초대 중이다.
// inviter한테 알려줘야 한다. // inviter한테 알려줘야 한다.
return gm.SendInvitationFailed(mid, inviteeDoc) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: makeTypeMessage(InvitationFail(inviteeDoc)),
})
return nil
} }
tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember) tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember)
@ -636,17 +567,20 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc
return errNoEmptySlot return errNoEmptySlot
} }
rconn.RegistOnCloseFunc("member_remove_invite", func() { rconn.registOnCloseFunc("member_remove_invite", func() {
gd.removeMember(targetid, &tid) gd.removeMember(targetid, &tid)
gm.memberSync(gid, targetid, tid, nil, false) gm.memberSync(gid, targetid, tid, nil, false)
}) })
gm.memberSync(gid, targetid, tid, newdoc, false) gm.memberSync(gid, targetid, tid, newdoc, false)
sendTypedMessage(gm, targetid, Invitation{ gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
GroupID: gid, Target: "@" + targetid.Hex(),
TicketID: tid, Body: makeTypeMessage(Invitation{
Inviter: inviterDoc, GroupID: gid,
ExpireAtUTC: newdoc.InviteExpire.Unix(), TicketID: tid,
Inviter: inviterDoc,
ExpireAtUTC: newdoc.InviteExpire.Unix(),
}),
}) })
return nil return nil
@ -681,7 +615,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
if rconn == nil { if rconn == nil {
// mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다. // mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다.
// 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다. // 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다.
return gid.Hex(), gm.rpc(mid).call(gid, mid, inviteeDoc, inviteeDoc) return "", rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
} }
// 이제 여기는 mid가 InCharge이면서 rconn이 존재 // 이제 여기는 mid가 InCharge이면서 rconn이 존재
@ -689,7 +623,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
if gd == nil { if gd == nil {
_, gd = gm.groups.createWithID(gid, bson.M{}) _, gd = gm.groups.createWithID(gid, bson.M{})
tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc) tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc)
rconn.RegistOnCloseFunc("member_remove", func() { rconn.registOnCloseFunc("member_remove", func() {
// 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다. // 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다.
gm.groupDocSync(gid, nil) gm.groupDocSync(gid, nil)
}) })
@ -710,9 +644,6 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc) return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc)
} }
func (gm *groupInMemory) UpdateGroupMember(gid groupID, mid accountID, tid ticketID, doc bson.M) error {
return nil
}
func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error { func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error {
return nil return nil
} }
@ -725,17 +656,17 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticket
rconn := gm.hasConn(mid) rconn := gm.hasConn(mid)
if rconn == nil { if rconn == nil {
return gid, gm.rpc(mid).call(gid, mid, tid, member) return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member)
} }
oldFunc := rconn.UnregistOnCloseFunc("member_remove") oldFunc := rconn.unregistOnCloseFunc("member_remove")
if oldFunc != nil { if oldFunc != nil {
// 기존 멤버였으면 탈퇴 처리 // 기존 멤버였으면 탈퇴 처리
oldFunc() oldFunc()
} }
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
rconn.RegistOnCloseFunc("member_remove", inviteFunc) rconn.registOnCloseFunc("member_remove", inviteFunc)
result, isNew := gd.addMember(mid, &tid, member) result, isNew := gd.addMember(mid, &tid, member)
if result != nil { if result != nil {
@ -754,10 +685,10 @@ func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID
rconn := gm.hasConn(mid) rconn := gm.hasConn(mid)
if rconn == nil { if rconn == nil {
return gm.rpc(mid).call(gid, mid, tid) return rpc.Make(gm).To(mid).Call(gid, mid, tid)
} }
inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
if inviteFunc != nil { if inviteFunc != nil {
inviteFunc() // removeMember는 여기에 들어있다. inviteFunc() // removeMember는 여기에 들어있다.
return nil return nil
@ -806,10 +737,16 @@ func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.
return nil return nil
} }
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, rconn *wshandler.Richconn) error { func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
rconn.UnregistOnCloseFunc("member_remove") rconn := gm.hasConn(mid)
rconn.UnregistOnCloseFunc("member_remove_invite") if rconn == nil {
rconn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "pause"), time.Time{}) return rpc.Make(gm).To(mid).Call(gid, mid)
}
// 접속은 끊기지만 그룹에서 제거하지는 않는 상태
rconn.unregistOnCloseFunc("member_remove")
rconn.unregistOnCloseFunc("member_remove_invite")
gm.sendCloseMessage(mid, "pause")
gd := gm.groups.find(gid) gd := gm.groups.find(gid)
if gd == nil { if gd == nil {
@ -903,24 +840,40 @@ func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error {
// targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자 // targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자
rconn := gm.hasConn(targetmid) rconn := gm.hasConn(targetmid)
if rconn == nil { if rconn == nil {
return gm.rpc(targetmid).call(gid, mid, tid) return rpc.Make(gm).To(targetmid).Call(gid, mid, tid)
} }
if oldfunc := rconn.UnregistOnCloseFunc("member_remove"); oldfunc != nil { if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil {
oldfunc() // 이 안에 다 있다. oldfunc() // 이 안에 다 있다.
} }
// 나한테는 빈 FullGroupDoc을 보낸다. // 나한테는 빈 FullGroupDoc을 보낸다.
sendTypedMessageDirect(rconn, FullGroupDoc{ gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Gid: gid, Target: "@" + mid.Hex(),
Body: makeTypeMessage(FullGroupDoc{Gid: gid}),
}) })
return nil return nil
} }
func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error { func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
tid := gd.ticket(mid)
bt, _ := json.Marshal(doc)
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: gd.updateBodyWithJson(personalized),
})
return nil return nil
} }
func (gm *groupInMemory) Dismiss(gid groupID) error { func (gm *groupInMemory) Dismiss(gid groupID) error {
return nil return nil
} }
@ -938,15 +891,21 @@ func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error {
return gm.groupDocSync(gid, newbody) return gm.groupDocSync(gid, newbody)
} }
func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool {
return gm.hasConn(target) != nil
}
var devflag = flagx.Bool("dev", false, "") var devflag = flagx.Bool("dev", false, "")
func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, typename string, wsh *wshandler.WebsocketHandler) (group, error) { func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) {
// group document // group document
// member document // member document
region := sub.region
wsh := sub.wsh
groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename) groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename)
memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename) memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename)
rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename) rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename)
clientMessageChanName := fmt.Sprintf("c_mgc_%s_%s", region, typename)
toHashHex := func(name string) string { toHashHex := func(name string) string {
hash := md5.New() hash := md5.New()
@ -962,7 +921,6 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
groupDocSyncChanName = toHashHex(groupDocSyncChanName) groupDocSyncChanName = toHashHex(groupDocSyncChanName)
memberSyncChanName = toHashHex(memberSyncChanName) memberSyncChanName = toHashHex(memberSyncChanName)
rpcChanName = toHashHex(rpcChanName) rpcChanName = toHashHex(rpcChanName)
clientMessageChanName = toHashHex(clientMessageChanName)
// 여기서는 subscribe channel // 여기서는 subscribe channel
// 각 함수에서는 publish // 각 함수에서는 publish
@ -996,15 +954,22 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
_, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result() _, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result()
return err return err
}, },
hasConn: func(t accountID) *wshandler.Richconn { hasConn: func(t accountID) *connection {
return wsh.Conn(region, t) return sub.cm.get(t)
}, },
groups: groupContainer{ groups: groupContainer{
groupDocs: make(map[groupID]*groupDoc), groupDocs: make(map[groupID]*groupDoc),
}, },
sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) {
wsh.SendUpstreamMessage(region, msg)
},
sendCloseMessage: func(target accountID, text string) {
wsh.SendCloseMessage(region, target.Hex(), text)
},
} }
// TODO : processChannelMessage 스레드 분리해보자 rpc.RegistReceiver(gm)
processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub { processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub {
defer func() { defer func() {
r := recover() r := recover()
@ -1020,34 +985,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
} }
switch msg.Channel { switch msg.Channel {
case clientMessageChanName: case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널
bt := []byte(msg.Payload)
if len(bt) < 24 {
break
}
// 주!! mid가 먼저
var mid groupID
copy(mid[:], bt[:12])
bt = bt[12:]
var gid groupID
copy(gid[:], bt[:12])
bt = bt[12:]
gd := gm.groups.find(gid)
if gd == nil {
break
}
tid, _ := gd.memberByAccount(mid)
if !tid.IsZero() {
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
if after, err := gd.updateBodyWithJson(personalized); err == nil {
go multicast(gd.conns(false), after)
}
}
case groupDocSyncChanName:
payload := []byte(msg.Payload) payload := []byte(msg.Payload)
if len(payload) < len(config.macAddr) { if len(payload) < len(config.macAddr) {
break break
@ -1065,17 +1003,17 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
if len(remain) == 0 { if len(remain) == 0 {
// gid 그룹 삭제 // gid 그룹 삭제
// 그룹 안에 있는 멤버에게 알림 // 그룹 안에 있는 멤버에게 알림
bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ bt := makeTypeMessage(FullGroupDoc{Gid: gid})
Gid: gid, gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
})) Target: "#" + gid.Hex(),
go multicast(gd.conns(true), bt) Body: bt,
})
gm.groups.delete(gid) gm.groups.delete(gid)
} else if string(senderHost) != config.macAddr { } else if string(senderHost) != config.macAddr {
if r, err := gd.updateBodyBsonToJson(remain); err != nil { gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
logger.Error("groupDocSyncChanName message decode failed :", remain, err) Target: "#" + gid.Hex(),
} else { Body: gd.updateBodyBsonToJson(remain),
go multicast(gd.conns(true), r) })
}
} }
} else if string(senderHost) != config.macAddr { } else if string(senderHost) != config.macAddr {
var newDoc groupDoc var newDoc groupDoc
@ -1086,7 +1024,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
} }
} }
case memberSyncChanName: case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널
if len(msg.Payload) < len(config.macAddr) { if len(msg.Payload) < len(config.macAddr) {
break break
} }
@ -1116,7 +1054,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
} }
var updated *memberDoc var updated *memberDoc
rconn := wsh.Conn(region, mid) rconn := gm.hasConn(mid)
if senderHost != config.macAddr { if senderHost != config.macAddr {
// 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅 // 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅
@ -1136,56 +1074,37 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
if rconn != nil { if rconn != nil {
// gid에 이미 다른 값이 있을 수 있다. // gid에 이미 다른 값이 있을 수 있다.
// 정확하게 이 값이면 제거하고, 아니면 넘어간다. // 정확하게 이 값이면 제거하고, 아니면 넘어간다.
rconn.RemoveTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
} }
broadcastTypedMessage(gm, gid, PublicMemberDoc{Tid: tid}) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: makeTypeMessage(PublicMemberDoc{Tid: tid}),
})
} else { } else {
if isNewMember && updated.rconn == nil && rconn != nil { if isNewMember && updated.rconn == nil && rconn != nil {
updated.rconn = rconn updated.rconn = rconn
} }
// 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외 // 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외
broadcastTypedMessage(gm, gid, PublicMemberDoc{ gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Tid: tid, Target: "#" + gid.Hex(),
memberDocCommon: updated.memberDocCommon, Body: makeTypeMessage(PublicMemberDoc{
Tid: tid,
memberDocCommon: updated.memberDocCommon,
}),
}) })
} }
if isNewMember { if isNewMember {
if rconn != nil { if rconn != nil {
// 새 멤버이므로 기존 멤버를 다 보내준다. // 새 멤버이므로 기존 멤버를 다 보내준다.
rconn.AddTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
rconn.WriteBytes(gd.serializeFull(gid, clientMessageChanName)) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: gd.serializeFull(gid),
})
} }
} }
case rpcChanName:
targetbt, fn, params, err := rpc.Decode[accountID]([]byte(msg.Payload))
if err != nil {
logger.Error("rpcChanName message decode failed :", msg.Payload, err)
break
}
call := func() {
method, ok := reflect.TypeOf(gm).MethodByName(fn)
if !ok {
logger.Println("message decode failed :", err, targetbt, msg.Payload)
}
args := []reflect.Value{
reflect.ValueOf(gm),
}
for _, arg := range params {
args = append(args, reflect.ValueOf(arg))
}
method.Func.Call(args)
}
if *targetbt == everyHost {
call()
} else if rconn := wsh.Conn(region, *targetbt); rconn != nil {
call()
}
default: default:
logger.Println("unknown channel") logger.Println("unknown channel")
} }
@ -1204,7 +1123,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type
var pubsub *redis.PubSub var pubsub *redis.PubSub
for { for {
if pubsub == nil { if pubsub == nil {
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName, clientMessageChanName) pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName)
} }
if pubsub == nil { if pubsub == nil {

View File

@ -27,19 +27,6 @@ func (rc *connection) addTag(name, val string) {
rc.tags = append(rc.tags, prefix+val) rc.tags = append(rc.tags, prefix+val)
} }
func (rc *connection) getTag(name string) string {
rc.locker.Lock()
defer rc.locker.Unlock()
prefix := name + "="
for _, tag := range rc.tags {
if strings.HasPrefix(tag, prefix) {
return tag[len(prefix):]
}
}
return ""
}
func (rc *connection) removeTag(name string, val string) { func (rc *connection) removeTag(name string, val string) {
rc.locker.Lock() rc.locker.Lock()
defer rc.locker.Unlock() defer rc.locker.Unlock()
@ -95,3 +82,16 @@ func (rc *connection) unregistOnCloseFunc(name string) (out func()) {
delete(rc.onClose, name) delete(rc.onClose, name)
return return
} }
func (rc *connection) cleanup() {
rc.locker.Lock()
defer rc.locker.Unlock()
cp := rc.onClose
rc.onClose = nil
go func() {
for _, f := range cp {
f()
}
}()
}

View File

@ -1,195 +0,0 @@
package rpc
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11})
func init() {
gob.Register(bson.M{})
gob.Register(primitive.ObjectID{})
gob.Register(primitive.Timestamp{})
}
type RpcCaller struct {
publish func(bt []byte) error
}
func NewRpcCaller(f func(bt []byte) error) RpcCaller {
return RpcCaller{
publish: f,
}
}
type rpcCallContext struct {
alias primitive.ObjectID
publish func(bt []byte) error
}
func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext {
return rpcCallContext{
alias: alias,
publish: c.publish,
}
}
func (c *RpcCaller) Everybody() rpcCallContext {
return rpcCallContext{
alias: Everybody,
publish: c.publish,
}
}
func IsCallerCalleeMethodMatch[Callee any]() error {
var caller rpcCallContext
var callee Callee
callerType := reflect.TypeOf(caller)
calleeType := reflect.TypeOf(callee)
for i := 0; i < callerType.NumMethod(); i++ {
callerMethod := callerType.Method(i)
calleeMethod, ok := calleeType.MethodByName(callerMethod.Name)
if !ok {
return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name())
}
if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() {
return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name)
}
if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() {
return fmt.Errorf("method '%s' out num is not match", callerMethod.Name)
}
for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ {
if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) {
return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name())
}
}
}
return nil
}
type fnsig struct {
FunctionName string `bson:"fn"`
Args []any `bson:"args"`
}
func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) {
m := append([]any{
prefix,
fn,
}, args...)
buff := new(bytes.Buffer)
encoder := gob.NewEncoder(buff)
err := encoder.Encode(m)
if err != nil {
logger.Error("rpcCallContext.send err :", err)
return nil, err
}
return buff.Bytes(), nil
}
func Decode[T any](src []byte) (*T, string, []any, error) {
var m []any
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&m); err != nil {
logger.Error("RpcCallee.Call err :", err)
return nil, "", nil, err
}
prfix := m[0].(T)
fn := m[1].(string)
return &prfix, fn, m[2:], nil
}
func decode(src []byte) (string, []any, error) {
var sig fnsig
decoder := gob.NewDecoder(bytes.NewReader(src))
if err := decoder.Decode(&sig); err != nil {
logger.Error("RpcCallee.Call err :", err)
return "", nil, err
}
return sig.FunctionName, sig.Args, nil
}
func (c *rpcCallContext) send(fn string, args ...any) error {
bt, err := Encode(c.alias, fn, args...)
if err != nil {
return err
}
return c.publish(bt)
}
type RpcCallee[T any] struct {
methods map[string]reflect.Method
create func(*wshandler.Richconn) *T
}
func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] {
out := RpcCallee[T]{
methods: make(map[string]reflect.Method),
create: createReceiverFunc,
}
var tmp *T
tp := reflect.TypeOf(tmp)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
out.methods[method.Name] = method
}
return out
}
func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
fn, params, err := decode(src)
if err != nil {
logger.Error("RpcCallee.Call err :", err)
return err
}
method, ok := r.methods[fn]
if !ok {
err := fmt.Errorf("method '%s' is missing", fn)
logger.Error("RpcCallee.Call err :", err)
return err
}
receiver := r.create(rc)
args := []reflect.Value{
reflect.ValueOf(receiver),
}
for _, arg := range params {
args = append(args, reflect.ValueOf(arg))
}
rets := method.Func.Call(args)
if len(rets) > 0 && rets[len(rets)-1].Interface() != nil {
return rets[len(rets)-1].Interface().(error)
}
return nil
}

View File

@ -1,33 +0,0 @@
package rpc
import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("JoinTag", region, tag, tid, hint)
}
func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error {
return c.send("LeaveTag", region, tag, tid, hint)
}
func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error {
return c.send("TurnGroupOnline", key, gid, score)
}
func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error {
return c.send("TurnGroupOffline", key, gid)
}
func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
return c.send("SetStateInTag", region, tag, tid, state, hint)
}
func (c rpcCallContext) SendMessage(doc []byte) error {
return c.send("SendMessage", doc)
}
func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error {
return c.send("SendMessageToTag", region, gid, msg)
}
func (c rpcCallContext) CloseOnPurpose() error {
return c.send("CloseOnPurpose")
}

View File

@ -8,12 +8,11 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
"sync"
"repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler" "repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core/rpc"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
@ -73,7 +72,7 @@ func readBsonDoc(r io.Reader, src any) error {
} }
type TavernConfig struct { type TavernConfig struct {
common.RegionStorageConfig `json:",inline"` gocommon.RegionStorageConfig `json:",inline"`
GroupTypes map[string]*groupConfig `json:"tavern_group_types"` GroupTypes map[string]*groupConfig `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"` MaingateApiToken string `json:"maingate_api_token"`
@ -83,17 +82,55 @@ type TavernConfig struct {
var config TavernConfig var config TavernConfig
type connectionMap struct {
sync.Mutex
conns map[primitive.ObjectID]*connection
}
func (cm *connectionMap) add(accid accountID, alias string) {
cm.Lock()
defer cm.Unlock()
old := cm.conns[accid]
if old != nil {
old.cleanup()
}
cm.conns[accid] = &connection{
alias: alias,
onClose: make(map[string]func()),
}
}
func (cm *connectionMap) remove(accid accountID) {
cm.Lock()
defer cm.Unlock()
old := cm.conns[accid]
if old != nil {
delete(cm.conns, accid)
old.cleanup()
}
}
func (cm *connectionMap) get(accid accountID) *connection {
cm.Lock()
defer cm.Unlock()
return cm.conns[accid]
}
type Tavern struct { type Tavern struct {
subTaverns []*subTavern subTaverns []*subTavern
wsh *wshandler.WebsocketHandler wsh *wshandler.WebsocketHandler
} }
type subTavern struct { type subTavern struct {
mongoClient common.MongoClient mongoClient gocommon.MongoClient
wsh *wshandler.WebsocketHandler wsh *wshandler.WebsocketHandler
region string region string
groups map[string]group groups map[string]group
methods map[string]reflect.Method methods map[string]reflect.Method
cm connectionMap
redisClient *redis.Client redisClient *redis.Client
} }
@ -118,7 +155,7 @@ func getMacAddr() (string, error) {
func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) { func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) {
if inconfig == nil { if inconfig == nil {
var loaded TavernConfig var loaded TavernConfig
if err := common.LoadConfig(&loaded); err != nil { if err := gocommon.LoadConfig(&loaded); err != nil {
return nil, err return nil, err
} }
inconfig = &loaded inconfig = &loaded
@ -130,7 +167,7 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err return nil, err
} }
config.macAddr = macaddr config.macAddr = macaddr
tv := Tavern{ tv := &Tavern{
wsh: wsh, wsh: wsh,
} }
@ -139,27 +176,29 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
return nil, err return nil, err
} }
return &tv, nil return tv, nil
} }
func (tv *Tavern) Destructor() { func (tv *Tavern) Cleanup() {
tv.wsh.Destructor()
for _, st := range tv.subTaverns { for _, st := range tv.subTaverns {
st.mongoClient.Close() st.mongoClient.Close()
} }
} }
type groupPipelineDocument struct { func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
OperationType string `bson:"operationType"` switch messageType {
FullDocument map[string]any `bson:"fullDocument"` case wshandler.Connected:
DocumentKey struct { case wshandler.Disconnected:
Id primitive.ObjectID `bson:"_id"` }
} `bson:"documentKey"` // gidtype := msg.Conn.GetTag("gid")
UpdateDescription struct { // if len(gidtype) > 0 {
UpdatedFields bson.M `bson:"updatedFields"` // tokens := strings.SplitN(gidtype, "@", 2)
RemovedFileds bson.A `bson:"removedFields"` // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
TruncatedArrays bson.A `bson:"truncatedArrays"` // gtype := tokens[1]
} `bson:"updateDescription"` // group := sub.groups[gtype]
// if group != nil {
// group.PauseMember(gidobj, msg.Alias, msg.Conn)
// }
} }
func (tv *Tavern) prepare(ctx context.Context) error { func (tv *Tavern) prepare(ctx context.Context) error {
@ -169,15 +208,11 @@ func (tv *Tavern) prepare(ctx context.Context) error {
return err return err
} }
for region, url := range config.RegionStorage { for region := range config.RegionStorage {
var dbconn common.MongoClient var dbconn gocommon.MongoClient
var err error var err error
var groupinstance group var groupinstance group
if err := rpc.IsCallerCalleeMethodMatch[connection](); err != nil {
return err
}
var tmp *subTavern var tmp *subTavern
methods := make(map[string]reflect.Method) methods := make(map[string]reflect.Method)
tp := reflect.TypeOf(tmp) tp := reflect.TypeOf(tmp)
@ -192,17 +227,20 @@ func (tv *Tavern) prepare(ctx context.Context) error {
region: region, region: region,
methods: methods, methods: methods,
redisClient: redisClient, redisClient: redisClient,
cm: connectionMap{
conns: make(map[primitive.ObjectID]*connection),
},
} }
groups := make(map[string]group) groups := make(map[string]group)
for typename, cfg := range config.GroupTypes { for typename, cfg := range config.GroupTypes {
cfg.Name = typename cfg.Name = typename
if cfg.Transient { if cfg.Transient {
groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh) groupinstance, err = cfg.prepareInMemory(ctx, typename, sub)
} else { //} else {
// TODO : db // TODO : db
// if !dbconn.Connected() { // if !dbconn.Connected() {
// dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) // dbconn, err = gocommon.NewMongoClient(ctx, url.Mongo, region)
// if err != nil { // if err != nil {
// return err // return err
// } // }
@ -224,21 +262,27 @@ func (tv *Tavern) prepare(ctx context.Context) error {
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
for _, sub := range tv.subTaverns { for _, sub := range tv.subTaverns {
tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived)
var pattern string var pattern string
if sub.region == "default" { if sub.region == "default" {
pattern = common.MakeHttpHandlerPattern(prefix, "api") pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
} else { } else {
pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api") pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api")
} }
serveMux.HandleFunc(pattern, sub.api) serveMux.HandleFunc(pattern, sub.api)
deliveryChan := tv.wsh.DeliveryChannel(sub.region)
go sub.deliveryMessageHandler(deliveryChan)
} }
return nil return nil
} }
func (sub *subTavern) clientMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected {
sub.cm.add(accid, alias)
} else if messageType == wshandler.Disconnected {
sub.cm.remove(accid)
}
}
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
s := recover() s := recover()

4
go.mod
View File

@ -4,15 +4,15 @@ go 1.19
require ( require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.5.0
go.mongodb.org/mongo-driver v1.11.7 go.mongodb.org/mongo-driver v1.11.7
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee
) )
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.16.6 // indirect github.com/klauspost/compress v1.16.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect github.com/pires/go-proxyproto v0.7.0 // indirect

10
go.sum
View File

@ -102,11 +102,5 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc= repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee h1:Aau1j/b9wI4nyvrM7m1Q+2xkcW1Qo7i3q+QBD4Umnzg=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e h1:l9aqmNpEF8V1o0b3eCT/nhC+O1dXMUcPzBPewbshuDI=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf h1:VQ78wRZaKHnWOM+Y2ZxB/EVNopzC4DNbwihledqjwy8=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e h1:LNzK2Fhl1X8JQfn7XsoQwz2H/LY7YmMehEPqCyXgV1U=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=

View File

@ -34,13 +34,14 @@ func main() {
panic(err) panic(err)
} else { } else {
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
wsh.RegisterHandlers(ctx, serveMux, *prefix) wsh.RegisterHandlers(serveMux, *prefix)
tv.RegisterHandlers(ctx, serveMux, *prefix) tv.RegisterHandlers(ctx, serveMux, *prefix)
server := common.NewHTTPServer(serveMux) server := common.NewHTTPServer(serveMux)
logger.Println("tavern is started") logger.Println("tavern is started")
wsh.Start(ctx)
server.Start() server.Start()
cancel() cancel()
tv.Destructor() tv.Cleanup()
wsh.Destructor() wsh.Cleanup()
} }
} }