WebsocketApiHandler로 변경

This commit is contained in:
2023-09-08 11:37:50 +09:00
parent 4a51f7d433
commit ce50657734
7 changed files with 423 additions and 378 deletions

View File

@ -1,11 +1,3 @@
package core
import (
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
type configDocument map[string]any
type group interface {
Initialize(*Tavern, configDocument) error
ClientMessageReceived(*wshandler.Sender, wshandler.WebSocketMessageType, any)
}

View File

@ -5,10 +5,8 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
@ -35,9 +33,6 @@ type groupChat struct {
sendUpstreamMessage func(msg *wshandler.UpstreamMessage)
}
var accidHex func(primitive.ObjectID) string
var accidstrHex func(string) string
func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
rem, _ := json.Marshal(cfg)
if err := json.Unmarshal(rem, &gc.chatConfig); err != nil {
@ -68,7 +63,7 @@ func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
_, err := gc.rh.JSONSet(name, "$", cfg)
if *devflag && err != nil {
gc.rh.Del(gc.rh.Context(), name).Result()
gc.rh.JSONDel(name, "$")
_, err = gc.rh.JSONSet(name, "$", cfg)
}
@ -77,140 +72,245 @@ func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
}
}
ts := fmt.Sprintf("%x-", time.Now().Unix())
if *devflag {
accidHex = func(accid primitive.ObjectID) string {
return ts + accid.Hex()
}
accidstrHex = func(accid string) string {
return ts + accid
}
} else {
accidHex = func(accid primitive.ObjectID) string {
return accid.Hex()
}
accidstrHex = func(accid string) string {
return accid
}
}
return nil
}
func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Disconnected {
if _, err := gc.rh.Del(gc.rh.Context(), accidHex(sender.Accid)).Result(); err != nil {
logger.Println(err)
}
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "EnterPublicChannel":
chanid := args[0].(string)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
size, err := gc.rh.JSONGetInt64(chanid, "$.size")
if err != nil || len(size) == 0 {
logger.Println("JSONGetInt64 failed :", chanid, err)
} else if size[0] < cfg.Capacity {
// 입장
newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
if err == nil {
gc.enterRoom(chanid, sender.Accid)
sender.RegistDisconnectedCallback(chanid, func() {
size, err := gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
if err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": size},
Tag: []string{"ChattingChannelProperties"},
})
}
})
func (gc *groupChat) ClientConnected(ctx wshandler.ApiCallContext) {
gc.rh.JSONSet(ctx.CallBy.Accid.Hex(), "$.channel", map[string]any{})
}
gc.rh.HSet(gc.rh.Context(), accidHex(sender.Accid), "cc_pub", chanid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": newsize[0]},
Tag: []string{"ChattingChannelProperties"},
})
}
} else {
// 풀방
logger.Println("chatting channel is full :", chanid, size, cfg.Capacity)
}
func (gc *groupChat) ClientDisconnected(ctx wshandler.ApiCallContext) {
docs, _ := gc.rh.JSONGetDocuments(ctx.CallBy.Accid.Hex(), "$.channel")
if len(docs) > 0 {
for k, v := range docs[0] {
typename := k
chanid := v.(string)
gc.leaveRoom(chanid, ctx.CallBy.Accid)
if k == "public" {
gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
} else {
logger.Println("chatting channel not valid :", chanid)
}
case "LeavePublicChannel":
chanid := args[0].(string)
gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_pub")
gc.leaveRoom(chanid, sender.Accid)
if f := sender.PopDisconnectedCallback(chanid); f != nil {
f()
}
case "TextMessage":
chanid := args[0].(string)
msg := args[1].(string)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": sender.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
case "EnterPrivateChannel":
typename := args[0].(string)
channel := args[1].(string)
var reason string
if len(args) > 2 {
reason = args[2].(string)
}
if len(reason) > 0 {
// 수락
ok, err := gc.rh.HSetNX(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename, channel).Result()
if err != nil || !ok {
// 이미 다른 private channel 참여 중
logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, sender.Accid.Hex(), typename, channel)
return
}
gc.enterRoom(channel, sender.Accid)
sender.RegistDisconnectedCallback(channel, func() {
gc.rh.JSONDel(channel, "$."+sender.Accid.Hex())
cnt, _ := gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename).Result()
if cnt > 0 {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{"sender": sender.Alias, "typename": typename},
Tag: []string{"LeavePrivateChannel"},
})
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": ctx.CallBy.Alias, "typename": typename},
Tag: []string{"LeavePrivateChannel"},
})
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{
"sender": sender.Alias,
"msg": reason,
"typename": typename,
},
Tag: []string{"EnterPrivateChannel"},
})
case "LeavePrivateChannel":
channel := args[1].(string)
gc.leaveRoom(channel, sender.Accid)
if f := sender.PopDisconnectedCallback(channel); f != nil {
f()
}
}
}
}
func (gc *groupChat) EnterPublicChannel(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
size, err := gc.rh.JSONGetInt64(chanid, "$.size")
if err != nil || len(size) == 0 {
logger.Println("JSONGetInt64 failed :", chanid, err)
} else if size[0] < cfg.Capacity {
// 입장
newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
if err == nil {
gc.enterRoom(chanid, ctx.CallBy.Accid)
gc.rh.JSONSet(ctx.CallBy.Accid.Hex(), "$.channel.public", chanid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": newsize[0]},
Tag: []string{"ChattingChannelProperties"},
})
}
} else {
// 풀방
logger.Println("chatting channel is full :", chanid, size, cfg.Capacity)
}
} else {
logger.Println("chatting channel not valid :", chanid)
}
}
func (gc *groupChat) LeavePublicChannel(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
cnt, _ := gc.rh.JSONDel(ctx.CallBy.Accid.Hex(), "$.channel.public")
if cnt > 0 {
gc.leaveRoom(chanid, ctx.CallBy.Accid)
gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
}
}
func (gc *groupChat) TextMessage(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
msg := ctx.Arguments[1].(string)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": ctx.CallBy.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
}
func (gc *groupChat) EnterPrivateChannel(ctx wshandler.ApiCallContext) {
typename := ctx.Arguments[0].(string)
channel := ctx.Arguments[1].(string)
var reason string
if len(ctx.Arguments) > 2 {
reason = ctx.Arguments[2].(string)
}
if len(reason) > 0 {
// 수락
ok, err := gc.rh.JSONSet(ctx.CallBy.Accid.Hex(), "$.channel."+typename, channel, gocommon.RedisonSetOptionNX)
if err != nil || !ok {
// 이미 다른 private channel 참여 중
logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, ctx.CallBy.Accid.Hex(), typename, channel)
return
}
gc.enterRoom(channel, ctx.CallBy.Accid)
} else {
// 내가 이미 private channel에 있다는 것을 다른 사람들에게 알려주기 위함
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{
"sender": ctx.CallBy.Alias,
"msg": reason,
"typename": typename,
},
Tag: []string{"EnterPrivateChannel"},
})
}
func (gc *groupChat) LeavePrivateChannel(ctx wshandler.ApiCallContext) {
typename := ctx.Arguments[0].(string)
chanid := ctx.Arguments[1].(string)
cnt, _ := gc.rh.JSONDel(ctx.CallBy.Accid.Hex(), "$.channel."+typename)
if cnt > 0 {
gc.leaveRoom(chanid, ctx.CallBy.Accid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": ctx.CallBy.Alias, "typename": typename},
Tag: []string{"LeavePrivateChannel"},
})
}
}
// func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
// if mt == wshandler.Disconnected {
// if _, err := gc.rh.Del(gc.rh.Context(), accidHex(sender.Accid)).Result(); err != nil {
// logger.Println(err)
// }
// } else if mt == wshandler.BinaryMessage {
// commandline := message.([]any)
// cmd := commandline[0].(string)
// args := commandline[1:]
// switch cmd {
// case "EnterPublicChannel":
// chanid := args[0].(string)
// if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
// size, err := gc.rh.JSONGetInt64(chanid, "$.size")
// if err != nil || len(size) == 0 {
// logger.Println("JSONGetInt64 failed :", chanid, err)
// } else if size[0] < cfg.Capacity {
// // 입장
// newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
// if err == nil {
// gc.enterRoom(chanid, sender.Accid)
// sender.RegistDisconnectedCallback(chanid, func() {
// size, err := gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
// if err == nil {
// gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
// Target: "#" + chanid,
// Body: map[string]any{"size": size},
// Tag: []string{"ChattingChannelProperties"},
// })
// }
// })
// gc.rh.HSet(gc.rh.Context(), accidHex(sender.Accid), "cc_pub", chanid)
// gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
// Target: "#" + chanid,
// Body: map[string]any{"size": newsize[0]},
// Tag: []string{"ChattingChannelProperties"},
// })
// }
// } else {
// // 풀방
// logger.Println("chatting channel is full :", chanid, size, cfg.Capacity)
// }
// } else {
// logger.Println("chatting channel not valid :", chanid)
// }
// case "LeavePublicChannel":
// chanid := args[0].(string)
// gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_pub")
// gc.leaveRoom(chanid, sender.Accid)
// if f := sender.PopDisconnectedCallback(chanid); f != nil {
// f()
// }
// case "TextMessage":
// chanid := args[0].(string)
// msg := args[1].(string)
// gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
// Target: "#" + chanid,
// Body: map[string]any{"sender": sender.Alias, "msg": msg},
// Tag: []string{"TextMessage"},
// })
// case "EnterPrivateChannel":
// typename := args[0].(string)
// channel := args[1].(string)
// var reason string
// if len(args) > 2 {
// reason = args[2].(string)
// }
// if len(reason) > 0 {
// // 수락
// // 이거 HSet 하면 안되겠는데? JSONSet해야할 듯?
// ok, err := gc.rh.HSetNX(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename, channel).Result()
// if err != nil || !ok {
// // 이미 다른 private channel 참여 중
// logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, sender.Accid.Hex(), typename, channel)
// return
// }
// gc.enterRoom(channel, sender.Accid)
// sender.RegistDisconnectedCallback(channel, func() {
// gc.rh.JSONDel(channel, "$."+sender.Accid.Hex())
// // 이거 HDel 하면 안되겠는데? JSONDel해야할 듯?
// cnt, _ := gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename).Result()
// if cnt > 0 {
// gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
// Target: "#" + channel,
// Body: map[string]any{"sender": sender.Alias, "typename": typename},
// Tag: []string{"LeavePrivateChannel"},
// })
// }
// })
// } else {
// // 내가 이미 private channel에 있다는 것을 다른 사람들에게 알려주기 위함
// }
// gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
// Target: "#" + channel,
// Body: map[string]any{
// "sender": sender.Alias,
// "msg": reason,
// "typename": typename,
// },
// Tag: []string{"EnterPrivateChannel"},
// })
// case "LeavePrivateChannel":
// channel := args[1].(string)
// gc.leaveRoom(channel, sender.Accid)
// if f := sender.PopDisconnectedCallback(channel); f != nil {
// f()
// }
// }
// }
// }
func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) {
var data struct {
Prefix string `bson:"prefix"`
@ -271,38 +371,23 @@ func (gc *groupChat) QueryPlayerChattingChannel(w http.ResponseWriter, r *http.R
Accid string `bson:"accid"`
Typename string `bson:"typename"`
}
if err := gocommon.ReadBsonDocumentFromBody(r.Body, &data); err != nil {
err := gocommon.ReadBsonDocumentFromBody(r.Body, &data)
if err != nil {
logger.Println("QueryPlayerChattingChannel failed. ReadBsonDocumentFromBody returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
accid := data.Accid
typename := data.Typename
var fields []string
if len(typename) == 0 {
fields = []string{"cc_pub"}
} else {
fields = []string{"cc_pub", "cc_" + typename}
}
chans, err := gc.rh.HMGet(gc.rh.Context(), accidstrHex(accid), fields...).Result()
sub, err := gc.rh.JSONGetDocuments(accid, "$.channel")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
output := make(map[string]string)
if len(chans) > 0 && chans[0] != nil {
output["public"] = chans[0].(string)
if len(sub) > 0 {
enc := json.NewEncoder(w)
enc.Encode(sub[0])
}
if len(chans) > 1 && chans[1] != nil {
output[typename] = chans[1].(string)
}
enc := json.NewEncoder(w)
enc.Encode(output)
}
func (gc *groupChat) SendMessageOnChannel(w http.ResponseWriter, r *http.Request) {

View File

@ -174,7 +174,7 @@ func (gd *groupDoc) addMember(mid accountID, character bson.M) (bson.M, error) {
tid := gd.tid(mid)
prefix := "$._members." + tid
if _, err := gd.rh.JSONMerge(gd.strid(), prefix+"._body", character, gocommon.RedisonSetOptionXX); err != nil {
if _, err := gd.rh.JSONSet(gd.strid(), prefix+"._body", character, gocommon.RedisonSetOptionXX); err != nil {
return nil, err
}
@ -262,16 +262,6 @@ func (gp *groupParty) Initialize(tv *Tavern, cfg configDocument) error {
tv.wsh.LeaveRoom(gid.Hex(), accid)
}
// tv.apiFuncs.registApiFunction("JoinParty", gp.JoinParty)
// tv.apiFuncs.registApiFunction("InviteToParty", gp.InviteToParty)
// tv.apiFuncs.registApiFunction("AcceptPartyInvitation", gp.AcceptPartyInvitation)
// tv.apiFuncs.registApiFunction("DenyPartyInvitation", gp.DenyPartyInvitation)
// tv.apiFuncs.registApiFunction("QueryPartyMemberState", gp.QueryPartyMemberState)
// tv.apiFuncs.registApiFunction("LeaveParty", gp.LeaveParty)
// tv.apiFuncs.registApiFunction("UpdatePartyMemberDocument", gp.UpdatePartyMemberDocument)
// tv.apiFuncs.registApiFunction("UpdatePartyDocument", gp.UpdatePartyDocument)
// tv.apiFuncs.registApiFunction("QueryPartyMembers", gp.QueryPartyMembers)
return nil
}
@ -289,6 +279,7 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) {
Gid primitive.ObjectID `bson:"gid"`
Mid primitive.ObjectID `bson:"mid"`
Character bson.M `bson:"character"`
First bool `bson:"first"`
}
if err := gocommon.ReadBsonDocumentFromBody(r.Body, &data); err != nil {
logger.Println("JoinParty failed. ReadBsonDocumentFromBody returns err :", err)
@ -296,7 +287,7 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) {
return
}
doc := data.Character
character := data.Character
gid := data.Gid
mid := data.Mid
@ -319,31 +310,52 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) {
}
// 내 정보 업데이트할 때에도 사용됨
if memdoc, err := gd.addMember(mid, doc["character"].(map[string]any)); err == nil {
// 기존 유저에게 새 유저 알림
if data.First {
if memdoc, err := gd.addMember(mid, character); err == nil {
// 기존 유저에게 새 유저 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: map[string]any{
gd.tid(mid): memdoc,
},
Tag: []string{"MemberDocFull"},
})
gp.enterRoom(gid, mid)
// 최초 입장이라면 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: mid.Hex(),
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
writeBsonDoc(w, map[string]string{
"gid": gid.Hex(),
"tid": gd.tid(mid),
})
} else if err != nil {
logger.Error("JoinParty failed :", err)
w.WriteHeader(http.StatusInternalServerError)
}
} else {
path := "$._members." + gd.tid(mid) + "._body"
if _, err := gd.rh.JSONSet(gd.strid(), path, character, gocommon.RedisonSetOptionXX); err != nil {
logger.Error("JoinParty failed :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 기존 유저에게 캐릭터 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: map[string]any{
gd.tid(mid): memdoc,
gd.tid(mid): bson.M{
"_body": character,
},
},
Tag: []string{"MemberDocFull"},
Tag: []string{"MemberDocFragment"},
})
gp.enterRoom(gid, mid)
// 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: mid.Hex(),
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
writeBsonDoc(w, map[string]string{
"gid": gid.Hex(),
"tid": gd.tid(mid),
})
} else if err != nil {
logger.Error("JoinParty failed :", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
@ -622,6 +634,8 @@ func (gp *groupParty) LeaveParty(w http.ResponseWriter, r *http.Request) {
err = gd.removeMemberByTid(tid)
} else {
err = gd.removeMember(mid)
// 내가 나갔다
gp.rh.JSONDel(mid.Hex(), "$.party.id")
}
if err != nil {
@ -630,8 +644,6 @@ func (gp *groupParty) LeaveParty(w http.ResponseWriter, r *http.Request) {
return
}
gp.rh.JSONDel(mid.Hex(), "$.party.id")
// mid한테는 빈 GroupDocFull을 보낸다. 그러면 지워짐
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: mid.Hex(),
@ -676,30 +688,6 @@ func (gp *groupParty) updateMemberDocument(gid groupID, mid accountID, doc bson.
return nil
}
func (gp *groupParty) UpdatePartyMemberDocument(w http.ResponseWriter, r *http.Request) {
var data struct {
Gid primitive.ObjectID `bson:"gid"`
Mid primitive.ObjectID `bson:"mid"`
Doc bson.M `bson:"doc"`
}
if err := gocommon.ReadBsonDocumentFromBody(r.Body, &data); err != nil {
logger.Println("UpdatePartyMemberDocument failed. ReadBsonDocumentFromBody returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
mid := data.Mid
gid := data.Gid
updatedoc := data.Doc
if err := gp.updateMemberDocument(gid, mid, updatedoc); err != nil {
logger.Println("UpdatePartyMemberDocument failed :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (gp *groupParty) updatePartyDocument(gid groupID, frag bson.M) error {
gd := groupDoc{
id: gid,
@ -834,26 +822,67 @@ func (gp *groupParty) memberDisconnected(room string, mid primitive.ObjectID) {
})
}
func (gp *groupParty) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Disconnected {
rooms := message.([]string)
for _, roomname := range rooms {
gp.memberDisconnected(roomname, sender.Accid)
}
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "UpdatePartyMemberDocument":
gidobj, _ := primitive.ObjectIDFromHex(args[0].(string))
doc := args[1].(map[string]any)
gp.updateMemberDocument(gidobj, sender.Accid, doc)
func (gp *groupParty) ClientDisconnected(ctx wshandler.ApiCallContext) {
gids, _ := gp.rh.JSONGetString(ctx.CallBy.Accid.Hex(), "$.party.id")
case "UpdatePartyDocument":
gidobj, _ := primitive.ObjectIDFromHex(args[0].(string))
doc := args[1].(map[string]any)
gp.updatePartyDocument(gidobj, doc)
}
if len(gids) > 0 && len(gids[0]) > 0 {
// mid한테는 빈 GroupDocFull을 보낸다. 그러면 지워짐
gidstr := gids[0]
gid, _ := primitive.ObjectIDFromHex(gidstr)
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: ctx.CallBy.Accid.Hex(),
Body: bson.M{"gid": gid},
Tag: []string{"GroupDocFull", gidstr},
})
// gid에는 제거 메시지 보냄
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gidstr,
Body: bson.M{
makeTid(gid, ctx.CallBy.Accid): bson.M{},
},
Tag: []string{"MemberDocFull"},
})
gp.leaveRoom(gid, ctx.CallBy.Accid)
}
}
func (gp *groupParty) UpdatePartyMemberDocumentDirect(ctx wshandler.ApiCallContext) {
gidobj, _ := primitive.ObjectIDFromHex(ctx.Arguments[0].(string))
doc := ctx.Arguments[1].(map[string]any)
gp.updateMemberDocument(gidobj, ctx.CallBy.Accid, doc)
}
func (gp *groupParty) UpdatePartyDocumentDirect(ctx wshandler.ApiCallContext) {
gidobj, _ := primitive.ObjectIDFromHex(ctx.Arguments[0].(string))
doc := ctx.Arguments[1].(map[string]any)
gp.updatePartyDocument(gidobj, doc)
}
// func (gp *groupParty) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
// if mt == wshandler.Disconnected {
// rooms := message.([]string)
// for _, roomname := range rooms {
// gp.memberDisconnected(roomname, sender.Accid)
// }
// } else if mt == wshandler.BinaryMessage {
// commandline := message.([]any)
// cmd := commandline[0].(string)
// args := commandline[1:]
// switch cmd {
// case "UpdatePartyMemberDocument":
// gidobj, _ := primitive.ObjectIDFromHex(args[0].(string))
// doc := args[1].(map[string]any)
// gp.updateMemberDocument(gidobj, sender.Accid, doc)
// case "UpdatePartyDocument":
// gidobj, _ := primitive.ObjectIDFromHex(args[0].(string))
// doc := args[1].(map[string]any)
// gp.updatePartyDocument(gidobj, doc)
// }
// }
// }

View File

@ -2,7 +2,6 @@ package core
import (
"context"
"encoding/json"
"errors"
"io"
"net"
@ -39,22 +38,19 @@ func writeBsonDoc[T any](w io.Writer, src T) error {
type TavernConfig struct {
session.SessionConfig `json:",inline"`
gocommon.StorageAddr `json:"storage"`
Group map[string]configDocument `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"`
RedisURL string `json:"tavern_redis_url"`
macAddr string
Group map[string]configDocument `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"`
RedisURL string `json:"tavern_redis_url"`
macAddr string
}
var config TavernConfig
type Tavern struct {
wsh *wshandler.WebsocketHandler
mongoClient gocommon.MongoClient
redison *gocommon.RedisonHandler
groups map[string]group
apiReceivers gocommon.HttpApiHandlerContainer
wsh *wshandler.WebsocketHandler
mongoClient gocommon.MongoClient
redison *gocommon.RedisonHandler
httpApiBorker gocommon.HttpApiHandlerContainer
}
func getMacAddr() (string, error) {
@ -74,16 +70,11 @@ func getMacAddr() (string, error) {
}
// New :
func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) {
if inconfig == nil {
var loaded TavernConfig
if err := gocommon.LoadConfig(&loaded); err != nil {
return nil, err
}
inconfig = &loaded
func New(context context.Context, wsh *wshandler.WebsocketHandler) (*Tavern, error) {
if err := gocommon.LoadConfig(&config); err != nil {
return nil, err
}
config = *inconfig
macaddr, err := getMacAddr()
if err != nil {
return nil, err
@ -106,21 +97,21 @@ func (tv *Tavern) Cleanup() {
}
func (tv *Tavern) prepare(ctx context.Context) error {
redisClient, err := gocommon.NewRedisClient(config.StorageAddr.Redis["tavern"])
redisClient, err := gocommon.NewRedisClient(config.RedisURL)
if err != nil {
return err
}
tv.redison = gocommon.NewRedisonHandler(redisClient.Context(), redisClient)
tv.wsh.RegisterApiHandler(wshandler.MakeWebsocketApiHandler(tv, "tv"))
groups := make(map[string]group)
if cfg, ok := config.Group["chat"]; ok {
chat := new(groupChat)
if err := chat.Initialize(tv, cfg); err != nil {
return err
}
tv.apiReceivers.RegistReceiver(gocommon.MakeHttpApiReceiver(chat, "chat"))
groups["chat"] = chat
tv.httpApiBorker.RegisterApiHandler(gocommon.MakeHttpApiHandler(chat, "chat"))
tv.wsh.RegisterApiHandler(wshandler.MakeWebsocketApiHandler(chat, "chat"))
}
if cfg, ok := config.Group["party"]; ok {
@ -128,66 +119,82 @@ func (tv *Tavern) prepare(ctx context.Context) error {
if err := party.Initialize(tv, cfg); err != nil {
return err
}
tv.apiReceivers.RegistReceiver(gocommon.MakeHttpApiReceiver(party, "party"))
groups["party"] = party
tv.httpApiBorker.RegisterApiHandler(gocommon.MakeHttpApiHandler(party, "party"))
tv.wsh.RegisterApiHandler(wshandler.MakeWebsocketApiHandler(party, "party"))
}
tv.groups = groups
return nil
}
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
tv.wsh.RegisterReceiver(tv)
// tv.wsh.RegisterReceiver(tv)
pattern := gocommon.MakeHttpHandlerPattern(prefix, "api")
serveMux.HandleFunc(pattern, tv.api)
return nil
}
func (tv *Tavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected {
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
tv.redison.Del(tv.redison.Context(), sender.Accid.Hex())
_, err := tv.redison.JSONSet(sender.Accid.Hex(), "$", bson.M{"_ts": time.Now().UTC().Unix()})
if err != nil {
logger.Println("OnClientMessageReceived HSet error :", err)
}
func (tv *Tavern) EnterChannel(ctx wshandler.ApiCallContext) {
tv.wsh.EnterRoom(ctx.Arguments[0].(string), ctx.CallBy.Accid)
}
for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, nil)
}
} else if messageType == wshandler.Disconnected {
var rooms []string
dec := json.NewDecoder(body)
if err := dec.Decode(&rooms); err == nil {
for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, rooms)
}
}
tv.redison.Del(tv.redison.Context(), sender.Accid.Hex()).Result()
logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
} else if messageType == wshandler.BinaryMessage {
var commandline []any
dec := json.NewDecoder(body)
if err := dec.Decode(&commandline); err == nil {
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "EnterChannel":
tv.wsh.EnterRoom(args[0].(string), sender.Accid)
func (tv *Tavern) LeaveChannel(ctx wshandler.ApiCallContext) {
tv.wsh.LeaveRoom(ctx.Arguments[0].(string), ctx.CallBy.Accid)
}
case "LeaveChannel":
tv.wsh.LeaveRoom(args[0].(string), sender.Accid)
default:
for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, commandline)
}
}
}
func (tv *Tavern) ClientConnected(ctx wshandler.ApiCallContext) {
logger.Println("ClientConnected :", ctx.CallBy.Alias)
tv.redison.Del(tv.redison.Context(), ctx.CallBy.Accid.Hex())
_, err := tv.redison.JSONSet(ctx.CallBy.Accid.Hex(), "$", bson.M{"_ts": time.Now().UTC().Unix()})
if err != nil {
logger.Println("OnClientMessageReceived HSet error :", err)
}
}
func (tv *Tavern) ClientDisconnected(ctx wshandler.ApiCallContext) {
tv.redison.Del(tv.redison.Context(), ctx.CallBy.Accid.Hex()).Result()
logger.Println("ClientDisconnected :", ctx.CallBy.Alias)
}
// func (tv *Tavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
// if messageType == wshandler.Connected {
// logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
// tv.redison.Del(tv.redison.Context(), sender.Accid.Hex())
// _, err := tv.redison.JSONSet(sender.Accid.Hex(), "$", bson.M{"_ts": time.Now().UTC().Unix()})
// if err != nil {
// logger.Println("OnClientMessageReceived HSet error :", err)
// }
// } else if messageType == wshandler.Disconnected {
// // TODO : 알려줘야하나???
// var rooms []string
// dec := json.NewDecoder(body)
// if err := dec.Decode(&rooms); err == nil {
// for _, gt := range tv.groups {
// gt.ClientMessageReceived(sender, messageType, rooms)
// }
// }
// tv.redison.Del(tv.redison.Context(), sender.Accid.Hex()).Result()
// logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
// } else if messageType == wshandler.BinaryMessage {
// var commandline []any
// dec := json.NewDecoder(body)
// if err := dec.Decode(&commandline); err == nil {
// cmd := commandline[0].(string)
// args := commandline[1:]
// switch cmd {
// case "EnterChannel":
// tv.wsh.EnterRoom(args[0].(string), sender.Accid)
// case "LeaveChannel":
// tv.wsh.LeaveRoom(args[0].(string), sender.Accid)
// }
// }
// }
// }
func (tv *Tavern) OnRoomCreated(name string) {
cnt, err := tv.redison.IncrBy(tv.redison.Context(), "_ref_"+name, 1).Result()
if err != nil && !errors.Is(err, redis.Nil) {
@ -236,5 +243,5 @@ func (tv *Tavern) api(w http.ResponseWriter, r *http.Request) {
return
}
tv.apiReceivers.Call(funcname, w, r)
tv.httpApiBorker.Call(funcname, w, r)
}