package core import ( "context" "crypto/md5" "encoding/gob" "encoding/hex" "encoding/json" "errors" "fmt" "net/url" "os" "strings" "sync" "time" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/rpc" "repositories.action2quare.com/ayo/gocommon/wshandler" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) type accountID = primitive.ObjectID type ticketID = primitive.ObjectID type groupID = primitive.ObjectID type Invitation struct { GroupID groupID `json:"gid"` TicketID ticketID `json:"tid"` Inviter bson.M `json:"inviter"` ExpireAtUTC int64 `json:"expire_at_utc"` } type memberDocCommon struct { Body bson.M Invite bool InviteExpire time.Time JoinTime int64 } // 플레이어한테 공유하는 멤버 정보 type PublicMemberDoc struct { memberDocCommon `json:",inline"` Tid ticketID } type FullGroupDoc struct { Gid groupID AllMembers []*PublicMemberDoc `json:",omitempty"` Body GroupDocBody `json:",omitempty"` } type GroupDocBody bson.M type InvitationFail bson.M type memberDoc struct { memberDocCommon `json:",inline"` // underscore keys in Hidden Hidden bson.M rconn *connection Mid accountID } type groupDoc struct { sync.Mutex Body GroupDocBody InCharge accountID tickets map[ticketID]*memberDoc createTime time.Time } func init() { gob.Register(PublicMemberDoc{}) } func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) { gd.Lock() defer gd.Unlock() err := bson.Unmarshal(src, &gd.Body) if err != nil { return nil, err } return bson.Marshal(gd.Body) } func (gd *groupDoc) updateBodyWithJson(src []byte) GroupDocBody { gd.Lock() defer gd.Unlock() err := json.Unmarshal(src, &gd.Body) if err != nil { return nil } return gd.Body } func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) GroupDocBody { gd.Lock() defer gd.Unlock() err := bson.Unmarshal(bsonSrc, &gd.Body) if err != nil { return nil } return gd.Body } func (gd *groupDoc) updateBody(bsonSrc []byte) error { gd.Lock() defer gd.Unlock() return bson.Unmarshal(bsonSrc, &gd.Body) } func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) { gd.Lock() defer gd.Unlock() mid := inviteeDoc["_mid"].(accountID) body := inviteeDoc["body"].(bson.M) // 초대 가능한 빈 자리가 있나 now := time.Now().UTC() if len(gd.tickets) < max { tid := primitive.NewObjectID() newdoc := &memberDoc{ memberDocCommon: memberDocCommon{ Body: body, Invite: true, InviteExpire: now.Add(ttl), }, rconn: rconn, Mid: mid, } gd.tickets[tid] = newdoc return tid, newdoc } for oldtid, mem := range gd.tickets { if !mem.Invite { continue } if mem.InviteExpire.Before(now) { delete(gd.tickets, oldtid) tid := primitive.NewObjectID() newdoc := &memberDoc{ memberDocCommon: memberDocCommon{ Body: body, Invite: true, InviteExpire: now.Add(ttl), }, rconn: rconn, Mid: mid, } gd.tickets[tid] = newdoc return tid, newdoc } } return primitive.NilObjectID, nil } func seperateHidden(in bson.M) (public bson.M, hidden bson.M) { for k, v := range in { if k[0] == '_' { if hidden == nil { hidden = make(bson.M) } hidden[k] = v } } for k := range hidden { delete(in, k) } return in, hidden } func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) { gd.Lock() defer gd.Unlock() if !gd.InCharge.IsZero() { return primitive.NilObjectID, nil } gd.InCharge = mid newtid := primitive.NewObjectID() doc, hidden := seperateHidden(doc) newdoc := &memberDoc{ memberDocCommon: memberDocCommon{ Body: doc, Invite: false, JoinTime: time.Now().UTC().Unix(), }, rconn: rconn, Mid: mid, Hidden: hidden, } gd.tickets[newtid] = newdoc if gd.Body == nil { gd.Body = GroupDocBody(make(bson.M)) } gd.Body["incharge"] = newtid.Hex() return newtid, newdoc } func (gd *groupDoc) addMember(mid accountID, tid *ticketID, doc bson.M) (*memberDoc, bool) { gd.Lock() defer gd.Unlock() var memdoc *memberDoc isNew := false if tid.IsZero() { for oldtid, d := range gd.tickets { if d.Mid == mid { memdoc = d *tid = oldtid isNew = true break } } } else { var ok bool memdoc, ok = gd.tickets[*tid] if !ok { // 티켓이 업네? return nil, false } if memdoc.Mid != mid { // 내 티켓이 아니네? return nil, false } } doc, hidden := seperateHidden(doc) if memdoc != nil { memdoc.Body = doc memdoc.Hidden = hidden if memdoc.Invite { isNew = true memdoc.Invite = false } if memdoc.JoinTime == 0 { memdoc.JoinTime = time.Now().UTC().Unix() } } return memdoc, isNew } func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) { gd.Lock() defer gd.Unlock() if tid.IsZero() { for t, mem := range gd.tickets { if mem.Mid == mid { *tid = t delete(gd.tickets, t) return } } } delete(gd.tickets, *tid) if gd.InCharge == mid { gd.InCharge = primitive.NilObjectID } } func (gd *groupDoc) ticket(mid accountID) ticketID { gd.Lock() defer gd.Unlock() if mid.IsZero() { return primitive.NilObjectID } for t, m := range gd.tickets { if m.Mid == mid { return t } } return primitive.NilObjectID } func (gd *groupDoc) member(tid ticketID) *memberDoc { gd.Lock() defer gd.Unlock() return gd.tickets[tid] } func (gd *groupDoc) memberByAccount(mid accountID) (ticketID, *memberDoc) { gd.Lock() defer gd.Unlock() for tid, doc := range gd.tickets { if doc.Mid == mid { return tid, doc } } return primitive.NilObjectID, nil } func (gd *groupDoc) modifyMemberDocument(mid accountID, tid *ticketID, cb func(b *memberDoc)) *memberDoc { gd.Lock() defer gd.Unlock() if tid.IsZero() { for t, mem := range gd.tickets { if mem.Mid == mid { *tid = t break } } } if tid.IsZero() { return nil } if mem := gd.tickets[*tid]; mem != nil { cb(mem) return mem } return nil } func (gd *groupDoc) overwriteMemberDocument(mid accountID, tid *ticketID, raw []byte) *memberDoc { gd.Lock() defer gd.Unlock() if tid.IsZero() { for t, mem := range gd.tickets { if mem.Mid == mid { *tid = t json.Unmarshal(raw, &mem.Body) return mem } } } if mem := gd.tickets[*tid]; mem != nil { var newbody primitive.M json.Unmarshal(raw, &newbody) mem.Body = newbody return mem } return nil } func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) { gd.Lock() defer gd.Unlock() for k, v := range gd.tickets { cb(k, v) } } func (gd *groupDoc) serializeFull(gid groupID) FullGroupDoc { gd.Lock() defer gd.Unlock() var output []*PublicMemberDoc for k, v := range gd.tickets { if v.Invite { // 아직 초대 중인 대상. 패스 continue } output = append(output, &PublicMemberDoc{ memberDocCommon: v.memberDocCommon, Tid: k, }) } return FullGroupDoc{ Gid: gid, AllMembers: output, Body: gd.Body, } } type groupContainer struct { sync.Mutex groupDocs map[groupID]*groupDoc } type groupInMemory struct { *groupConfig groupDocSync func(groupID, []byte) error memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error rpcCall func([]byte) error hasConn func(accountID) *connection sendUpstreamMessage func(*wshandler.UpstreamMessage) sendEnterRoomMessage func(groupID, accountID) sendLeaveRoomMessage func(groupID, accountID) sendCloseMessage func(accountID, string) groups groupContainer } func (gc *groupContainer) add(id groupID, doc *groupDoc) { gc.Lock() defer gc.Unlock() gc.groupDocs[id] = doc } func (gc *groupContainer) createWithID(newid groupID, base bson.M) (groupID, *groupDoc) { gc.Lock() defer gc.Unlock() if _, ok := gc.groupDocs[newid]; ok { return primitive.NilObjectID, nil } newdoc := newGroupDoc(base) gc.groupDocs[newid] = newdoc return newid, newdoc } func (gc *groupContainer) delete(gid groupID) { gc.Lock() defer gc.Unlock() delete(gc.groupDocs, gid) } func (gc *groupContainer) find(id groupID) *groupDoc { gc.Lock() defer gc.Unlock() if found, ok := gc.groupDocs[id]; ok { return found } return nil } func newGroupDoc(base bson.M) *groupDoc { return &groupDoc{ Body: GroupDocBody(base), createTime: time.Now().UTC(), tickets: make(map[ticketID]*memberDoc), } } func (gm *groupInMemory) Create(form url.Values, base bson.M) (groupID, error) { return primitive.NilObjectID, nil } func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error { logger.Error("not implemented func : Canidate") return nil } var errGroupNotExist = errors.New("group does not exist") var errNoEmptySlot = errors.New("no more seat in group") func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) { group := gm.groups.find(gid) if group == nil { // 그룹이 없다. 실패 return primitive.NilObjectID, errGroupNotExist } // 내 정보 업데이트할 때에도 사용됨 // 굳이 InCharge가 있는 호스트가 아니어도 가능 if memdoc, isNew := group.addMember(mid, &tid, doc); memdoc != nil { gm.memberSync(gid, mid, tid, memdoc, isNew) } return tid, nil } func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID { return primitive.NilObjectID } // func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { // bt, _ := json.Marshal(makeTypeMessage(msg)) // rconn.WriteBytes(bt) // } // func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { // bt, _ := json.Marshal(makeTypeMessage(msg)) // gm.SendMessage(target, bt) // } // func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { // rconn := gm.hasConn(target) // if rconn != nil { // rconn.WriteBytes(msg) // } else { // gm.rpc(target).call(target, msg) // } // } // func multicast(conns []*wshandler.Richconn, raw []byte) { // for _, rconn := range conns { // rconn.WriteBytes(raw) // } // } // func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { // if gd := gm.groups.find(gid); gd != nil { // bt, _ := json.Marshal(makeTypeMessage(msg)) // go multicast(gd.conns(false), bt) // } // } var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error { targetid := inviteeDoc["_mid"].(accountID) // invitee에게 알림 // invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자 rconn := gm.hasConn(targetid) if rconn == nil { return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) } gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } if rconn.hasOnCloseFunc("member_remove_invite") { // 이미 초대 중이다. // inviter한테 알려줘야 한다. gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), Body: inviteeDoc, Tag: []string{"InvitationFail"}, }) return nil } tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember) if newdoc == nil { return errNoEmptySlot } rconn.registOnCloseFunc("member_remove_invite", func() { gd.removeMember(targetid, &tid) gm.memberSync(gid, targetid, tid, nil, false) }) gm.memberSync(gid, targetid, tid, newdoc, false) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + targetid.Hex(), Body: Invitation{ GroupID: gid, TicketID: tid, Inviter: inviterDoc, ExpireAtUTC: newdoc.InviteExpire.Unix(), }, Tag: []string{"Invitation"}, }) return nil } var errAlreayMember = errors.New("this target is already member") func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { targetid, ok := inviteeDoc["_mid"].(accountID) if !ok { return "", errInviteeDocMidMissing } if !gid.IsZero() { if gd := gm.groups.find(gid); gd != nil { if gd.InCharge != mid { // 이러면 안된다. // 초대는 InCharge만 할 수 있음 return "", nil } } } // gid는 미리 만들어 놔야함. // 초대하는 클라이언트가 아직 group을 소유하지 않고 있을 수 있다. // mid의 rconn이 이 호스트에 없더라도 gid는 이 request를 보낸 클라이언트가 받아야 하기 떄문 if gid.IsZero() { gid = primitive.NewObjectID() } rconn := gm.hasConn(mid) if rconn == nil { // mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다. // 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다. return gid.Hex(), rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) } // 이제 여기는 mid가 InCharge이면서 rconn이 존재 gd := gm.groups.find(gid) if gd == nil { _, gd = gm.groups.createWithID(gid, bson.M{}) tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc) rconn.registOnCloseFunc("member_remove", func() { // 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다. gm.groupDocSync(gid, nil) }) bt, err := bson.Marshal(gd.Body) if err != nil { return "", err } gm.groupDocSync(gid, bt) gm.memberSync(gid, mid, tid, newdoc, true) // 내가 wshandler room에 입장 gm.sendEnterRoomMessage(gid, mid) } else { // targetid가 이미 멤버인지 미리 확인 가능 if !gd.ticket(targetid).IsZero() { // 이미 멤버네 return "", errAlreayMember } } return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc) } func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error { return nil } func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticketID, member bson.M) (groupID, error) { gd := gm.groups.find(gid) if gd == nil { return primitive.NilObjectID, errGroupNotExist } rconn := gm.hasConn(mid) if rconn == nil { return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member) } oldFunc := rconn.unregistOnCloseFunc("member_remove") if oldFunc != nil { // 기존 멤버였으면 탈퇴 처리 oldFunc() } inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") rconn.registOnCloseFunc("member_remove", inviteFunc) result, isNew := gd.addMember(mid, &tid, member) if result != nil { gm.sendEnterRoomMessage(gid, mid) return gid, gm.memberSync(gid, mid, tid, result, isNew) } // 실패 return primitive.NilObjectID, nil } func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID) error { gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } rconn := gm.hasConn(mid) if rconn == nil { return rpc.Make(gm).To(mid).Call(gid, mid, tid) } inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") if inviteFunc != nil { inviteFunc() // removeMember는 여기에 들어있다. return nil } gd.removeMember(mid, &tid) return gm.memberSync(gid, mid, tid, nil, false) } func (gm *groupInMemory) QueryInvitations(mid accountID, after primitive.Timestamp) ([]bson.M, error) { return nil, nil } func (gm *groupInMemory) Exist(gid groupID, filter bson.M) (bool, error) { return false, nil } func (gm *groupInMemory) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) { return nil, nil } func (gm *groupInMemory) FindOne(gid groupID, projection string) (bson.M, error) { return nil, nil } func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } tid, memdoc := gd.memberByAccount(mid) if memdoc == nil { return errNotMember } if _, ok := memdoc.Body["paused"]; ok { // 드랍해야 한다. if gd.InCharge == mid { // 내가 방장인 경우 return gm.groupDocSync(gid, nil) } else { // 내가 방장이 아닌 경우 gd.removeMember(mid, &tid) return gm.memberSync(gid, mid, tid, nil, false) } } return nil } func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error { rconn := gm.hasConn(mid) if rconn == nil { return rpc.Make(gm).To(mid).Call(gid, mid) } // 접속은 끊기지만 그룹에서 제거하지는 않는 상태 rconn.unregistOnCloseFunc("member_remove") rconn.unregistOnCloseFunc("member_remove_invite") gm.sendCloseMessage(mid, "pause") gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } tid := primitive.NilObjectID newdoc := gd.modifyMemberDocument(mid, &tid, func(memdoc *memberDoc) { memdoc.Body["paused"] = true memdoc.rconn = nil }) return gm.memberSync(gid, mid, tid, newdoc, false) } func (gm *groupInMemory) QueryMembers(gid groupID, reqID accountID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { gd := gm.groups.find(gid) if gd == nil { return nil, errGroupNotExist } if gd.InCharge != reqID { return nil, errGroupNotExist } outdocs := make(map[string]bson.M) if len(projection) > 0 { projkeys := map[string]bool{} for _, p := range strings.Split(projection, ",") { if p[0] == '+' { projkeys[strings.TrimSpace(p[1:])] = true } else { projkeys[strings.TrimSpace(p)] = true } } gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { outdoc := bson.M{} for k := range projkeys { if k[0] == '_' { outdoc[k] = memdoc.Hidden[k] } else { outdoc[k] = memdoc.Body[k] } } outdocs[memdoc.Mid.Hex()] = outdoc }) } else { gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { outdoc := bson.M{} for k, v := range memdoc.Hidden { outdoc[k] = v } for k, v := range memdoc.Body { outdoc[k] = v } outdocs[memdoc.Mid.Hex()] = outdoc }) } return outdocs, nil } func (gm *groupInMemory) QueryMember(gid groupID, mid accountID, tid ticketID, projection string) (bson.M, error) { return nil, nil } var errHaveNoAuthority = errors.New("cannot kick other member") var errNotMember = errors.New("ticket is not in this group") func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error { gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } // mid가 InCharge인 경우에는 tid가 누구든 내쫓고, // mid가 InCharge가 아닌 경우는 tid가 mid일 경우에만 나갈 수 있다. memdoc := gd.member(tid) if memdoc == nil { return errNotMember } targetmid := memdoc.Mid // 내가 방장이면 아무나 내보낼 수 있다. if gd.InCharge != mid && targetmid != mid { // targetmid와 mid가 같아야 한다. 방장이 아니므로 나는 나만 내보낼 수 있다. return errHaveNoAuthority } // targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자 rconn := gm.hasConn(targetmid) if rconn == nil { return rpc.Make(gm).To(targetmid).Call(gid, mid, tid) } if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil { oldfunc() // 이 안에 다 있다. } // 나한테는 빈 FullGroupDoc을 보낸다. gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), Body: FullGroupDoc{Gid: gid}, Tag: []string{"FullGroupDoc", gid.Hex()}, }) gm.sendLeaveRoomMessage(gid, targetmid) return nil } func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error { gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } tid := gd.ticket(mid) bt, _ := json.Marshal(doc) personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt))) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), Body: gd.updateBodyWithJson(personalized), Tag: []string{"GroupDocBody"}, }) return nil } func (gm *groupInMemory) Dismiss(gid groupID) error { return nil } func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error { gd := gm.groups.find(gid) if gd == nil { return errGroupNotExist } newbody, err := gd.updateBodyWithBson(body) if err != nil { return err } return gm.groupDocSync(gid, newbody) } func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool { return gm.hasConn(target) != nil } var devflag = flagx.Bool("dev", false, "") func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) { // group document // member document region := sub.region wsh := sub.wsh groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename) memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename) rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename) toHashHex := func(name string) string { hash := md5.New() hash.Write([]byte(name)) if *devflag { hn, _ := os.Hostname() hash.Write([]byte(hn)) } return hex.EncodeToString(hash.Sum(nil)[:8]) } groupDocSyncChanName = toHashHex(groupDocSyncChanName) memberSyncChanName = toHashHex(memberSyncChanName) rpcChanName = toHashHex(rpcChanName) // 여기서는 subscribe channel // 각 함수에서는 publish gm := &groupInMemory{ groupConfig: cfg, groupDocSync: func(gid groupID, newbody []byte) error { bt := []byte(fmt.Sprintf("%s%s", config.macAddr, gid.Hex())) bt = append(bt, newbody...) _, err := wsh.RedisSync.Publish(ctx, groupDocSyncChanName, bt).Result() return err }, memberSync: func(gid groupID, mid accountID, tid ticketID, doc *memberDoc, newmember bool) error { var payload string if doc != nil { bt, _ := json.Marshal(doc) newmemberflag := func() string { if newmember { return "t" } else { return "f" } }() payload = fmt.Sprintf("%s%s%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex(), newmemberflag, string(bt)) } else { payload = fmt.Sprintf("%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex()) } _, err := wsh.RedisSync.Publish(ctx, memberSyncChanName, payload).Result() return err }, rpcCall: func(bt []byte) error { _, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result() return err }, hasConn: func(t accountID) *connection { return sub.cm.get(t) }, groups: groupContainer{ groupDocs: make(map[groupID]*groupDoc), }, sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) { wsh.SendUpstreamMessage(region, msg) }, sendEnterRoomMessage: func(gid groupID, accid accountID) { wsh.EnterRoom(region, gid.Hex(), accid) }, sendLeaveRoomMessage: func(gid groupID, accid accountID) { wsh.LeaveRoom(region, gid.Hex(), accid) }, sendCloseMessage: func(target accountID, text string) { wsh.SendCloseMessage(region, target.Hex(), text) }, } rpc.RegistReceiver(gm) processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub { defer func() { r := recover() if r != nil { logger.Error(r) } }() for msg := range pubsub.Channel() { if msg == nil { pubsub = nil break } switch msg.Channel { case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널 payload := []byte(msg.Payload) if len(payload) < len(config.macAddr) { break } senderHost, remain := payload[:len(config.macAddr)], payload[len(config.macAddr):] if len(remain) < 24 { break } idstr, remain := remain[:24], remain[24:] gid, _ := primitive.ObjectIDFromHex(string(idstr)) gd := gm.groups.find(gid) if gd != nil { if len(remain) == 0 { // gid 그룹 삭제 // 그룹 안에 있는 멤버에게 알림 gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), Body: FullGroupDoc{Gid: gid}, Tag: []string{"FullGroupDoc"}, }) gm.groups.delete(gid) } else if string(senderHost) != config.macAddr { gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), Body: gd.updateBodyBsonToJson(remain), Tag: []string{"GroupDocBody"}, }) } } else if string(senderHost) != config.macAddr { var newDoc groupDoc if err := newDoc.updateBody(remain); err != nil { logger.Error("groupDocSyncChanName message decode failed :", remain, err) } else { gm.groups.add(gid, &newDoc) } } case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널 if len(msg.Payload) < len(config.macAddr) { break } senderHost, remain := msg.Payload[:len(config.macAddr)], msg.Payload[len(config.macAddr):] if len(remain) < 24 { break } idstr, remain := remain[:24], remain[24:] gid, _ := primitive.ObjectIDFromHex(idstr) gd := gm.groups.find(gid) if gd == nil { // 미리 그룹을 없애고 싱크 메시지를 보낸후 받은 것일 수 있다. break } idstr, remain = remain[:24], remain[24:] mid, _ := primitive.ObjectIDFromHex(idstr) idstr, remain = remain[:24], remain[24:] tid, _ := primitive.ObjectIDFromHex(idstr) isNewMember := false if len(remain) > 0 { idstr, remain = remain[:1], remain[1:] isNewMember = idstr == "t" } var updated *memberDoc rconn := gm.hasConn(mid) if senderHost != config.macAddr { // 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅 if len(remain) == 0 { // mid 삭제 gd.removeMember(mid, &tid) updated = nil } else { updated = gd.overwriteMemberDocument(mid, &tid, []byte(remain)) } } else { updated = gd.member(tid) } if updated == nil { // 멤버 삭제 알림 if rconn != nil { // gid에 이미 다른 값이 있을 수 있다. // 정확하게 이 값이면 제거하고, 아니면 넘어간다. rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) } gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), Body: PublicMemberDoc{Tid: tid}, Tag: []string{"PublicMemberDoc"}, }) } else { if isNewMember && updated.rconn == nil && rconn != nil { updated.rconn = rconn } // 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외 gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "#" + gid.Hex(), Body: PublicMemberDoc{ Tid: tid, memberDocCommon: updated.memberDocCommon, }, Tag: []string{"PublicMemberDoc"}, }) } if isNewMember { if rconn != nil { // 새 멤버이므로 기존 멤버를 다 보내준다. rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ Target: "@" + mid.Hex(), Body: gd.serializeFull(gid), Tag: []string{"FullGroupDoc"}, }) } } default: logger.Println("unknown channel") } } return pubsub } go func() { defer func() { r := recover() if r != nil { logger.Error(r) } }() var pubsub *redis.PubSub for { if pubsub == nil { pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName) } if pubsub == nil { time.Sleep(time.Second) continue } pubsub = processChannelMessage(gm, pubsub) } }() return gm, nil }