Squashed commit of the following:
commit8e1b232d57Author: mountain <mountain@action2quare.com> Date: Wed Jul 19 09:37:02 2023 +0900 InMemory 그룹을 redis로 변경 commit01da5bb3a4Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 01:31:39 2023 +0900 body를 marshaling하고 클라이언트에서 flatten함 commitba61a11659Author: mountain <mountain@action2quare.com> Date: Mon Jul 17 17:47:07 2023 +0900 gob 등록 commit67cca13326Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 18:41:24 2023 +0900 모듈 업데이트 commit272c696c59Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 17:29:21 2023 +0900 json value 다시 되돌림 commitaa568ec3faAuthor: mountain <mountain@action2quare.com> Date: Sun Jul 16 17:26:19 2023 +0900 SetOption 타입 변경 commitb9c4d8b21bAuthor: mountain <mountain@action2quare.com> Date: Sun Jul 16 17:15:08 2023 +0900 objvalue marshalling 수정 commit99834c1461Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 17:01:06 2023 +0900 objlen 수정 commit592112219eAuthor: mountain <mountain@action2quare.com> Date: Sun Jul 16 16:38:05 2023 +0900 gocommon 업데이트 commit62485b6d54Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 15:36:20 2023 +0900 redis json 마이그레이션 완료 commitd36dd13bb7Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 02:51:41 2023 +0900 redis stack 사용
This commit is contained in:
104
core/tavern.go
104
core/tavern.go
@ -9,8 +9,9 @@ import (
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
@ -30,10 +31,6 @@ func writeBsonArr(w io.Writer, src []bson.M) error {
|
||||
})
|
||||
}
|
||||
|
||||
func onlineGroupQueryKey(prefix string) string {
|
||||
return prefix + "_olg"
|
||||
}
|
||||
|
||||
func writeBsonDoc[T any](w io.Writer, src T) error {
|
||||
rw, err := bsonrw.NewBSONValueWriter(w)
|
||||
if err != nil {
|
||||
@ -82,43 +79,6 @@ type TavernConfig struct {
|
||||
|
||||
var config TavernConfig
|
||||
|
||||
type connectionMap struct {
|
||||
sync.Mutex
|
||||
conns map[primitive.ObjectID]*connection
|
||||
}
|
||||
|
||||
func (cm *connectionMap) add(accid accountID, alias string) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
old.cleanup()
|
||||
}
|
||||
cm.conns[accid] = &connection{
|
||||
alias: alias,
|
||||
onClose: make(map[string]func()),
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) remove(accid accountID) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
delete(cm.conns, accid)
|
||||
old.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) get(accid accountID) *connection {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
return cm.conns[accid]
|
||||
}
|
||||
|
||||
type Tavern struct {
|
||||
subTaverns []*subTavern
|
||||
wsh *wshandler.WebsocketHandler
|
||||
@ -126,11 +86,11 @@ type Tavern struct {
|
||||
|
||||
type subTavern struct {
|
||||
mongoClient gocommon.MongoClient
|
||||
redisClient *redis.Client
|
||||
wsh *wshandler.WebsocketHandler
|
||||
region string
|
||||
groups map[string]group
|
||||
methods map[string]reflect.Method
|
||||
cm connectionMap
|
||||
}
|
||||
|
||||
func getMacAddr() (string, error) {
|
||||
@ -183,25 +143,8 @@ func (tv *Tavern) Cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
// func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
// switch messageType {
|
||||
// case wshandler.Connected:
|
||||
|
||||
// case wshandler.Disconnected:
|
||||
// }
|
||||
// // gidtype := msg.Conn.GetTag("gid")
|
||||
// // if len(gidtype) > 0 {
|
||||
// // tokens := strings.SplitN(gidtype, "@", 2)
|
||||
// // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
// // gtype := tokens[1]
|
||||
// // group := sub.groups[gtype]
|
||||
// // if group != nil {
|
||||
// // group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
// // }
|
||||
// }
|
||||
|
||||
func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
for region := range config.RegionStorage {
|
||||
for region, addr := range config.RegionStorage {
|
||||
var dbconn gocommon.MongoClient
|
||||
var err error
|
||||
var groupinstance group
|
||||
@ -214,14 +157,17 @@ func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
methods[method.Name] = method
|
||||
}
|
||||
|
||||
redisClient, err := gocommon.NewRedisClient(addr.Redis["tavern"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sub := &subTavern{
|
||||
wsh: tv.wsh,
|
||||
mongoClient: dbconn,
|
||||
redisClient: redisClient,
|
||||
region: region,
|
||||
methods: methods,
|
||||
cm: connectionMap{
|
||||
conns: make(map[primitive.ObjectID]*connection),
|
||||
},
|
||||
}
|
||||
|
||||
groups := make(map[string]group)
|
||||
@ -254,7 +200,7 @@ func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
|
||||
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
|
||||
for _, sub := range tv.subTaverns {
|
||||
tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived)
|
||||
tv.wsh.RegisterReceiver(sub.region, sub)
|
||||
var pattern string
|
||||
if sub.region == "default" {
|
||||
pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
|
||||
@ -267,11 +213,11 @@ func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
if messageType == wshandler.Connected {
|
||||
sub.cm.add(sender.Accid, sender.Alias)
|
||||
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
|
||||
} else if messageType == wshandler.Disconnected {
|
||||
sub.cm.remove(sender.Accid)
|
||||
logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
|
||||
} else if messageType == wshandler.BinaryMessage {
|
||||
var msg map[string][]any
|
||||
dec := json.NewDecoder(body)
|
||||
@ -291,12 +237,34 @@ func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageTyp
|
||||
if group := sub.groups[typename]; group != nil {
|
||||
group.UpdateMemberDocument(gidobj, sender.Accid, doc)
|
||||
}
|
||||
|
||||
case "UpdateGroupDocument":
|
||||
typename := args[0].(string)
|
||||
gidobj, _ := primitive.ObjectIDFromHex(args[1].(string))
|
||||
doc := args[2].(map[string]any)
|
||||
if group := sub.groups[typename]; group != nil {
|
||||
group.UpdateGroupDocument(gidobj, doc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) OnRoomCreated(region, name string) {
|
||||
_, err := sub.redisClient.Persist(context.Background(), name).Result()
|
||||
if err != nil {
|
||||
logger.Println("OnRoomCreate Persist failed :", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) OnRoomDestroyed(region, name string) {
|
||||
_, err := sub.redisClient.Expire(context.Background(), name, 3600*time.Second).Result()
|
||||
if err != nil {
|
||||
logger.Println("OnRoomDestroyed Persist failed :", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
|
||||
Reference in New Issue
Block a user