Files
tavern/core/group_memory.go

1147 lines
27 KiB
Go

package core
import (
"context"
"crypto/md5"
"encoding/gob"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"strings"
"sync"
"time"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/rpc"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type accountID = primitive.ObjectID
type ticketID = primitive.ObjectID
type groupID = primitive.ObjectID
type Invitation struct {
GroupID groupID `json:"gid"`
TicketID ticketID `json:"tid"`
Inviter bson.M `json:"inviter"`
ExpireAtUTC int64 `json:"expire_at_utc"`
}
type memberDocCommon struct {
Body bson.M
Invite bool
InviteExpire time.Time
JoinTime int64
}
// 플레이어한테 공유하는 멤버 정보
type PublicMemberDoc struct {
memberDocCommon `json:",inline"`
Tid ticketID
}
type FullGroupDoc struct {
Gid groupID
AllMembers []*PublicMemberDoc `json:",omitempty"`
Body GroupDocBody `json:",omitempty"`
}
type GroupDocBody bson.M
type InvitationFail bson.M
type memberDoc struct {
memberDocCommon `json:",inline"`
// underscore keys in Hidden
Hidden bson.M
rconn *connection
Mid accountID
}
type groupDoc struct {
sync.Mutex
Body GroupDocBody
InCharge accountID
tickets map[ticketID]*memberDoc
createTime time.Time
}
func init() {
gob.Register(PublicMemberDoc{})
}
func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) {
gd.Lock()
defer gd.Unlock()
err := bson.Unmarshal(src, &gd.Body)
if err != nil {
return nil, err
}
return bson.Marshal(gd.Body)
}
func (gd *groupDoc) updateBodyWithJson(src []byte) GroupDocBody {
gd.Lock()
defer gd.Unlock()
err := json.Unmarshal(src, &gd.Body)
if err != nil {
return nil
}
return gd.Body
}
func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) GroupDocBody {
gd.Lock()
defer gd.Unlock()
err := bson.Unmarshal(bsonSrc, &gd.Body)
if err != nil {
return nil
}
return gd.Body
}
func (gd *groupDoc) updateBody(bsonSrc []byte) error {
gd.Lock()
defer gd.Unlock()
return bson.Unmarshal(bsonSrc, &gd.Body)
}
func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) {
gd.Lock()
defer gd.Unlock()
mid := inviteeDoc["_mid"].(accountID)
body := inviteeDoc["body"].(bson.M)
// 초대 가능한 빈 자리가 있나
now := time.Now().UTC()
if len(gd.tickets) < max {
tid := primitive.NewObjectID()
newdoc := &memberDoc{
memberDocCommon: memberDocCommon{
Body: body,
Invite: true,
InviteExpire: now.Add(ttl),
},
rconn: rconn,
Mid: mid,
}
gd.tickets[tid] = newdoc
return tid, newdoc
}
for oldtid, mem := range gd.tickets {
if !mem.Invite {
continue
}
if mem.InviteExpire.Before(now) {
delete(gd.tickets, oldtid)
tid := primitive.NewObjectID()
newdoc := &memberDoc{
memberDocCommon: memberDocCommon{
Body: body,
Invite: true,
InviteExpire: now.Add(ttl),
},
rconn: rconn,
Mid: mid,
}
gd.tickets[tid] = newdoc
return tid, newdoc
}
}
return primitive.NilObjectID, nil
}
func seperateHidden(in bson.M) (public bson.M, hidden bson.M) {
for k, v := range in {
if k[0] == '_' {
if hidden == nil {
hidden = make(bson.M)
}
hidden[k] = v
}
}
for k := range hidden {
delete(in, k)
}
return in, hidden
}
func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) {
gd.Lock()
defer gd.Unlock()
if !gd.InCharge.IsZero() {
return primitive.NilObjectID, nil
}
gd.InCharge = mid
newtid := primitive.NewObjectID()
doc, hidden := seperateHidden(doc)
newdoc := &memberDoc{
memberDocCommon: memberDocCommon{
Body: doc,
Invite: false,
JoinTime: time.Now().UTC().Unix(),
},
rconn: rconn,
Mid: mid,
Hidden: hidden,
}
gd.tickets[newtid] = newdoc
if gd.Body == nil {
gd.Body = GroupDocBody(make(bson.M))
}
gd.Body["incharge"] = newtid.Hex()
return newtid, newdoc
}
func (gd *groupDoc) addMember(mid accountID, tid *ticketID, doc bson.M) (*memberDoc, bool) {
gd.Lock()
defer gd.Unlock()
var memdoc *memberDoc
isNew := false
if tid.IsZero() {
for oldtid, d := range gd.tickets {
if d.Mid == mid {
memdoc = d
*tid = oldtid
isNew = true
break
}
}
} else {
var ok bool
memdoc, ok = gd.tickets[*tid]
if !ok {
// 티켓이 업네?
return nil, false
}
if memdoc.Mid != mid {
// 내 티켓이 아니네?
return nil, false
}
}
doc, hidden := seperateHidden(doc)
if memdoc != nil {
memdoc.Body = doc
memdoc.Hidden = hidden
if memdoc.Invite {
isNew = true
memdoc.Invite = false
}
if memdoc.JoinTime == 0 {
memdoc.JoinTime = time.Now().UTC().Unix()
}
}
return memdoc, isNew
}
func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) {
gd.Lock()
defer gd.Unlock()
if tid.IsZero() {
for t, mem := range gd.tickets {
if mem.Mid == mid {
*tid = t
delete(gd.tickets, t)
return
}
}
}
delete(gd.tickets, *tid)
if gd.InCharge == mid {
gd.InCharge = primitive.NilObjectID
}
}
func (gd *groupDoc) ticket(mid accountID) ticketID {
gd.Lock()
defer gd.Unlock()
if mid.IsZero() {
return primitive.NilObjectID
}
for t, m := range gd.tickets {
if m.Mid == mid {
return t
}
}
return primitive.NilObjectID
}
func (gd *groupDoc) member(tid ticketID) *memberDoc {
gd.Lock()
defer gd.Unlock()
return gd.tickets[tid]
}
func (gd *groupDoc) memberByAccount(mid accountID) (ticketID, *memberDoc) {
gd.Lock()
defer gd.Unlock()
for tid, doc := range gd.tickets {
if doc.Mid == mid {
return tid, doc
}
}
return primitive.NilObjectID, nil
}
func (gd *groupDoc) modifyMemberDocument(mid accountID, tid *ticketID, cb func(b *memberDoc)) *memberDoc {
gd.Lock()
defer gd.Unlock()
if tid.IsZero() {
for t, mem := range gd.tickets {
if mem.Mid == mid {
*tid = t
break
}
}
}
if tid.IsZero() {
return nil
}
if mem := gd.tickets[*tid]; mem != nil {
cb(mem)
return mem
}
return nil
}
func (gd *groupDoc) overwriteMemberDocument(mid accountID, tid *ticketID, raw []byte) *memberDoc {
gd.Lock()
defer gd.Unlock()
if tid.IsZero() {
for t, mem := range gd.tickets {
if mem.Mid == mid {
*tid = t
json.Unmarshal(raw, &mem.Body)
return mem
}
}
}
if mem := gd.tickets[*tid]; mem != nil {
var newbody primitive.M
json.Unmarshal(raw, &newbody)
mem.Body = newbody
return mem
}
return nil
}
func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) {
gd.Lock()
defer gd.Unlock()
for k, v := range gd.tickets {
cb(k, v)
}
}
func (gd *groupDoc) serializeFull(gid groupID) FullGroupDoc {
gd.Lock()
defer gd.Unlock()
var output []*PublicMemberDoc
for k, v := range gd.tickets {
if v.Invite {
// 아직 초대 중인 대상. 패스
continue
}
output = append(output, &PublicMemberDoc{
memberDocCommon: v.memberDocCommon,
Tid: k,
})
}
return FullGroupDoc{
Gid: gid,
AllMembers: output,
Body: gd.Body,
}
}
type groupContainer struct {
sync.Mutex
groupDocs map[groupID]*groupDoc
}
type groupInMemory struct {
*groupConfig
groupDocSync func(groupID, []byte) error
memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error
rpcCall func([]byte) error
hasConn func(accountID) *connection
sendUpstreamMessage func(*wshandler.UpstreamMessage)
sendEnterRoomMessage func(groupID, accountID)
sendLeaveRoomMessage func(groupID, accountID)
groups groupContainer
}
func (gc *groupContainer) add(id groupID, doc *groupDoc) {
gc.Lock()
defer gc.Unlock()
gc.groupDocs[id] = doc
}
func (gc *groupContainer) createWithID(newid groupID, base bson.M) (groupID, *groupDoc) {
gc.Lock()
defer gc.Unlock()
if _, ok := gc.groupDocs[newid]; ok {
return primitive.NilObjectID, nil
}
newdoc := newGroupDoc(base)
gc.groupDocs[newid] = newdoc
return newid, newdoc
}
func (gc *groupContainer) delete(gid groupID) {
gc.Lock()
defer gc.Unlock()
delete(gc.groupDocs, gid)
}
func (gc *groupContainer) find(id groupID) *groupDoc {
gc.Lock()
defer gc.Unlock()
if found, ok := gc.groupDocs[id]; ok {
return found
}
return nil
}
func newGroupDoc(base bson.M) *groupDoc {
return &groupDoc{
Body: GroupDocBody(base),
createTime: time.Now().UTC(),
tickets: make(map[ticketID]*memberDoc),
}
}
func (gm *groupInMemory) Create(form url.Values, base bson.M) (groupID, error) {
return primitive.NilObjectID, nil
}
func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error {
logger.Error("not implemented func : Canidate")
return nil
}
var errGroupNotExist = errors.New("group does not exist")
var errNoEmptySlot = errors.New("no more seat in group")
func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) {
group := gm.groups.find(gid)
if group == nil {
// 그룹이 없다. 실패
return primitive.NilObjectID, errGroupNotExist
}
// 내 정보 업데이트할 때에도 사용됨
// 굳이 InCharge가 있는 호스트가 아니어도 가능
if memdoc, isNew := group.addMember(mid, &tid, doc); memdoc != nil {
gm.memberSync(gid, mid, tid, memdoc, isNew)
}
return tid, nil
}
func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID {
return primitive.NilObjectID
}
// func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// rconn.WriteBytes(bt)
// }
// func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// gm.SendMessage(target, bt)
// }
// func (gm *groupInMemory) SendMessage(target accountID, msg []byte) {
// rconn := gm.hasConn(target)
// if rconn != nil {
// rconn.WriteBytes(msg)
// } else {
// gm.rpc(target).call(target, msg)
// }
// }
// func multicast(conns []*wshandler.Richconn, raw []byte) {
// for _, rconn := range conns {
// rconn.WriteBytes(raw)
// }
// }
// func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) {
// if gd := gm.groups.find(gid); gd != nil {
// bt, _ := json.Marshal(makeTypeMessage(msg))
// go multicast(gd.conns(false), bt)
// }
// }
var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field")
func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error {
targetid := inviteeDoc["_mid"].(accountID)
// invitee에게 알림
// invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자
rconn := gm.hasConn(targetid)
if rconn == nil {
return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
}
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
if rconn.hasOnCloseFunc("member_remove_invite") {
// 이미 초대 중이다.
// inviter한테 알려줘야 한다.
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: inviteeDoc,
Tag: []string{"InvitationFail"},
})
return nil
}
tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember)
if newdoc == nil {
return errNoEmptySlot
}
rconn.registOnCloseFunc("member_remove_invite", func() {
gd.removeMember(targetid, &tid)
gm.memberSync(gid, targetid, tid, nil, false)
})
gm.memberSync(gid, targetid, tid, newdoc, false)
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + targetid.Hex(),
Body: Invitation{
GroupID: gid,
TicketID: tid,
Inviter: inviterDoc,
ExpireAtUTC: newdoc.InviteExpire.Unix(),
},
Tag: []string{"Invitation"},
})
return nil
}
var errAlreayMember = errors.New("this target is already member")
func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) {
targetid, ok := inviteeDoc["_mid"].(accountID)
if !ok {
return "", errInviteeDocMidMissing
}
if !gid.IsZero() {
if gd := gm.groups.find(gid); gd != nil {
if gd.InCharge != mid {
// 이러면 안된다.
// 초대는 InCharge만 할 수 있음
return "", nil
}
}
}
// gid는 미리 만들어 놔야함.
// 초대하는 클라이언트가 아직 group을 소유하지 않고 있을 수 있다.
// mid의 rconn이 이 호스트에 없더라도 gid는 이 request를 보낸 클라이언트가 받아야 하기 떄문
if gid.IsZero() {
gid = primitive.NewObjectID()
}
rconn := gm.hasConn(mid)
if rconn == nil {
// mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다.
// 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다.
return gid.Hex(), rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc)
}
// 이제 여기는 mid가 InCharge이면서 rconn이 존재
gd := gm.groups.find(gid)
if gd == nil {
_, gd = gm.groups.createWithID(gid, bson.M{})
tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc)
rconn.registOnCloseFunc("member_remove", func() {
// 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다.
gm.groupDocSync(gid, nil)
})
bt, err := bson.Marshal(gd.Body)
if err != nil {
return "", err
}
gm.groupDocSync(gid, bt)
gm.memberSync(gid, mid, tid, newdoc, true)
// 내가 wshandler room에 입장
gm.sendEnterRoomMessage(gid, mid)
} else {
// targetid가 이미 멤버인지 미리 확인 가능
if !gd.ticket(targetid).IsZero() {
// 이미 멤버네
return "", errAlreayMember
}
}
return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc)
}
func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error {
return nil
}
func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticketID, member bson.M) (groupID, error) {
gd := gm.groups.find(gid)
if gd == nil {
return primitive.NilObjectID, errGroupNotExist
}
rconn := gm.hasConn(mid)
if rconn == nil {
return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member)
}
oldFunc := rconn.unregistOnCloseFunc("member_remove")
if oldFunc != nil {
// 기존 멤버였으면 탈퇴 처리
oldFunc()
}
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
rconn.registOnCloseFunc("member_remove", inviteFunc)
result, isNew := gd.addMember(mid, &tid, member)
if result != nil {
gm.sendEnterRoomMessage(gid, mid)
return gid, gm.memberSync(gid, mid, tid, result, isNew)
}
// 실패
return primitive.NilObjectID, nil
}
func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
rconn := gm.hasConn(mid)
if rconn == nil {
return rpc.Make(gm).To(mid).Call(gid, mid, tid)
}
inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite")
if inviteFunc != nil {
inviteFunc() // removeMember는 여기에 들어있다.
return nil
}
gd.removeMember(mid, &tid)
return gm.memberSync(gid, mid, tid, nil, false)
}
func (gm *groupInMemory) QueryInvitations(mid accountID, after primitive.Timestamp) ([]bson.M, error) {
return nil, nil
}
func (gm *groupInMemory) Exist(gid groupID, filter bson.M) (bool, error) {
return false, nil
}
func (gm *groupInMemory) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) {
return nil, nil
}
func (gm *groupInMemory) FindOne(gid groupID, projection string) (bson.M, error) {
return nil, nil
}
func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
tid, memdoc := gd.memberByAccount(mid)
if memdoc == nil {
return errNotMember
}
if _, ok := memdoc.Body["paused"]; ok {
// 드랍해야 한다.
if gd.InCharge == mid {
// 내가 방장인 경우
return gm.groupDocSync(gid, nil)
} else {
// 내가 방장이 아닌 경우
gd.removeMember(mid, &tid)
return gm.memberSync(gid, mid, tid, nil, false)
}
}
return nil
}
func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
rconn := gm.hasConn(mid)
if rconn == nil {
return rpc.Make(gm).To(mid).Call(gid, mid)
}
// 접속은 끊기지만 그룹에서 제거하지는 않는 상태
rconn.unregistOnCloseFunc("member_remove")
rconn.unregistOnCloseFunc("member_remove_invite")
gm.sendLeaveRoomMessage(gid, mid)
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
tid := primitive.NilObjectID
newdoc := gd.modifyMemberDocument(mid, &tid, func(memdoc *memberDoc) {
memdoc.Body["paused"] = true
memdoc.rconn = nil
})
return gm.memberSync(gid, mid, tid, newdoc, false)
}
func (gm *groupInMemory) QueryMembers(gid groupID, reqID accountID, projection string, after primitive.Timestamp) (map[string]bson.M, error) {
gd := gm.groups.find(gid)
if gd == nil {
return nil, errGroupNotExist
}
if gd.InCharge != reqID {
return nil, errGroupNotExist
}
outdocs := make(map[string]bson.M)
if len(projection) > 0 {
projkeys := map[string]bool{}
for _, p := range strings.Split(projection, ",") {
if p[0] == '+' {
projkeys[strings.TrimSpace(p[1:])] = true
} else {
projkeys[strings.TrimSpace(p)] = true
}
}
gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) {
outdoc := bson.M{}
for k := range projkeys {
if k[0] == '_' {
outdoc[k] = memdoc.Hidden[k]
} else {
outdoc[k] = memdoc.Body[k]
}
}
outdocs[memdoc.Mid.Hex()] = outdoc
})
} else {
gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) {
outdoc := bson.M{}
for k, v := range memdoc.Hidden {
outdoc[k] = v
}
for k, v := range memdoc.Body {
outdoc[k] = v
}
outdocs[memdoc.Mid.Hex()] = outdoc
})
}
return outdocs, nil
}
func (gm *groupInMemory) QueryMember(gid groupID, mid accountID, tid ticketID, projection string) (bson.M, error) {
return nil, nil
}
var errHaveNoAuthority = errors.New("cannot kick other member")
var errNotMember = errors.New("ticket is not in this group")
func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
// mid가 InCharge인 경우에는 tid가 누구든 내쫓고,
// mid가 InCharge가 아닌 경우는 tid가 mid일 경우에만 나갈 수 있다.
memdoc := gd.member(tid)
if memdoc == nil {
return errNotMember
}
targetmid := memdoc.Mid
// 내가 방장이면 아무나 내보낼 수 있다.
if gd.InCharge != mid && targetmid != mid {
// targetmid와 mid가 같아야 한다. 방장이 아니므로 나는 나만 내보낼 수 있다.
return errHaveNoAuthority
}
// targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자
rconn := gm.hasConn(targetmid)
if rconn == nil {
return rpc.Make(gm).To(targetmid).Call(gid, mid, tid)
}
if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil {
oldfunc() // 이 안에 다 있다.
}
// 나한테는 빈 FullGroupDoc을 보낸다.
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: FullGroupDoc{Gid: gid},
Tag: []string{"FullGroupDoc", gid.Hex()},
})
gm.sendLeaveRoomMessage(gid, targetmid)
return nil
}
func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
tid := gd.ticket(mid)
bt, _ := json.Marshal(doc)
personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt)))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: gd.updateBodyWithJson(personalized),
Tag: []string{"GroupDocBody"},
})
return nil
}
func (gm *groupInMemory) Dismiss(gid groupID) error {
return nil
}
func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error {
gd := gm.groups.find(gid)
if gd == nil {
return errGroupNotExist
}
newbody, err := gd.updateBodyWithBson(body)
if err != nil {
return err
}
return gm.groupDocSync(gid, newbody)
}
func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool {
return gm.hasConn(target) != nil
}
var devflag = flagx.Bool("dev", false, "")
func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) {
// group document
// member document
region := sub.region
wsh := sub.wsh
groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename)
memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename)
rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename)
toHashHex := func(name string) string {
hash := md5.New()
hash.Write([]byte(name))
if *devflag {
hn, _ := os.Hostname()
hash.Write([]byte(hn))
}
return hex.EncodeToString(hash.Sum(nil)[:8])
}
groupDocSyncChanName = toHashHex(groupDocSyncChanName)
memberSyncChanName = toHashHex(memberSyncChanName)
rpcChanName = toHashHex(rpcChanName)
// 여기서는 subscribe channel
// 각 함수에서는 publish
gm := &groupInMemory{
groupConfig: cfg,
groupDocSync: func(gid groupID, newbody []byte) error {
bt := []byte(fmt.Sprintf("%s%s", config.macAddr, gid.Hex()))
bt = append(bt, newbody...)
_, err := wsh.RedisSync.Publish(ctx, groupDocSyncChanName, bt).Result()
return err
},
memberSync: func(gid groupID, mid accountID, tid ticketID, doc *memberDoc, newmember bool) error {
var payload string
if doc != nil {
bt, _ := json.Marshal(doc)
newmemberflag := func() string {
if newmember {
return "t"
} else {
return "f"
}
}()
payload = fmt.Sprintf("%s%s%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex(), newmemberflag, string(bt))
} else {
payload = fmt.Sprintf("%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex())
}
_, err := wsh.RedisSync.Publish(ctx, memberSyncChanName, payload).Result()
return err
},
rpcCall: func(bt []byte) error {
_, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result()
return err
},
hasConn: func(t accountID) *connection {
return sub.cm.get(t)
},
groups: groupContainer{
groupDocs: make(map[groupID]*groupDoc),
},
sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) {
wsh.SendUpstreamMessage(region, msg)
},
sendEnterRoomMessage: func(gid groupID, accid accountID) {
wsh.EnterRoom(region, gid.Hex(), accid)
},
sendLeaveRoomMessage: func(gid groupID, accid accountID) {
wsh.LeaveRoom(region, gid.Hex(), accid)
},
}
rpc.RegistReceiver(gm)
processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
for msg := range pubsub.Channel() {
if msg == nil {
pubsub = nil
break
}
switch msg.Channel {
case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널
payload := []byte(msg.Payload)
if len(payload) < len(config.macAddr) {
break
}
senderHost, remain := payload[:len(config.macAddr)], payload[len(config.macAddr):]
if len(remain) < 24 {
break
}
idstr, remain := remain[:24], remain[24:]
gid, _ := primitive.ObjectIDFromHex(string(idstr))
gd := gm.groups.find(gid)
if gd != nil {
if len(remain) == 0 {
// gid 그룹 삭제
// 그룹 안에 있는 멤버에게 알림
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: FullGroupDoc{Gid: gid},
Tag: []string{"FullGroupDoc"},
})
gm.groups.delete(gid)
} else if string(senderHost) != config.macAddr {
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: gd.updateBodyBsonToJson(remain),
Tag: []string{"GroupDocBody"},
})
}
} else if string(senderHost) != config.macAddr {
var newDoc groupDoc
if err := newDoc.updateBody(remain); err != nil {
logger.Error("groupDocSyncChanName message decode failed :", remain, err)
} else {
gm.groups.add(gid, &newDoc)
}
}
case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널
if len(msg.Payload) < len(config.macAddr) {
break
}
senderHost, remain := msg.Payload[:len(config.macAddr)], msg.Payload[len(config.macAddr):]
if len(remain) < 24 {
break
}
idstr, remain := remain[:24], remain[24:]
gid, _ := primitive.ObjectIDFromHex(idstr)
gd := gm.groups.find(gid)
if gd == nil {
// 미리 그룹을 없애고 싱크 메시지를 보낸후 받은 것일 수 있다.
break
}
idstr, remain = remain[:24], remain[24:]
mid, _ := primitive.ObjectIDFromHex(idstr)
idstr, remain = remain[:24], remain[24:]
tid, _ := primitive.ObjectIDFromHex(idstr)
isNewMember := false
if len(remain) > 0 {
idstr, remain = remain[:1], remain[1:]
isNewMember = idstr == "t"
}
var updated *memberDoc
rconn := gm.hasConn(mid)
if senderHost != config.macAddr {
// 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅
if len(remain) == 0 {
// mid 삭제
gd.removeMember(mid, &tid)
updated = nil
} else {
updated = gd.overwriteMemberDocument(mid, &tid, []byte(remain))
}
} else {
updated = gd.member(tid)
}
if updated == nil {
// 멤버 삭제 알림
if rconn != nil {
// gid에 이미 다른 값이 있을 수 있다.
// 정확하게 이 값이면 제거하고, 아니면 넘어간다.
rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
}
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: PublicMemberDoc{Tid: tid},
Tag: []string{"PublicMemberDoc"},
})
} else {
if isNewMember && updated.rconn == nil && rconn != nil {
updated.rconn = rconn
}
// 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: PublicMemberDoc{
Tid: tid,
memberDocCommon: updated.memberDocCommon,
},
Tag: []string{"PublicMemberDoc"},
})
}
if isNewMember {
if rconn != nil {
// 새 멤버이므로 기존 멤버를 다 보내준다.
rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: gd.serializeFull(gid),
Tag: []string{"FullGroupDoc"},
})
}
}
default:
logger.Println("unknown channel")
}
}
return pubsub
}
go func() {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
var pubsub *redis.PubSub
for {
if pubsub == nil {
pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName)
}
if pubsub == nil {
time.Sleep(time.Second)
continue
}
pubsub = processChannelMessage(gm, pubsub)
}
}()
return gm, nil
}