session consumer로 교체

This commit is contained in:
2023-08-31 21:02:19 +09:00
parent cb5cd280b9
commit fd1502e52a
8 changed files with 106 additions and 110 deletions

View File

@ -11,9 +11,10 @@
} }
} }
}, },
"maingate_mongodb_url": "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", "maingate_session_storage": "redis://192.168.8.94:6380/1",
"maingate_service_url": "http://localhost/maingate", "maingate_session_ttl" : 3600,
"maingate_api_token": "63d08aa34f0162622c11284b", "maingate_api_token": "63d08aa34f0162622c11284b",
"tavern_service_url": "http://localhost/tavern", "tavern_service_url": "http://localhost/tavern",
"tavern_group_types": { "tavern_group_types": {
"party": { "party": {

View File

@ -53,6 +53,6 @@ func (afc *apiFuncsContainer) call(fn string, w http.ResponseWriter, r *http.Req
type configDocument map[string]any type configDocument map[string]any
type group interface { type group interface {
Initialize(*subTavern, configDocument) error Initialize(*Tavern, configDocument) error
ClientMessageReceived(*wshandler.Sender, wshandler.WebSocketMessageType, any) ClientMessageReceived(*wshandler.Sender, wshandler.WebSocketMessageType, any)
} }

View File

@ -45,28 +45,28 @@ type groupChat struct {
var accidHex func(primitive.ObjectID) string var accidHex func(primitive.ObjectID) string
var accidstrHex func(string) string var accidstrHex func(string) string
func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error { func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
rem, _ := json.Marshal(cfg) rem, _ := json.Marshal(cfg)
if err := json.Unmarshal(rem, &gc.chatConfig); err != nil { if err := json.Unmarshal(rem, &gc.chatConfig); err != nil {
return err return err
} }
gc.enterRoom = func(chanid channelID, accid accountID) { gc.enterRoom = func(chanid channelID, accid accountID) {
sub.wsh.EnterRoom(sub.region, string(chanid), accid) tv.wsh.EnterRoom(string(chanid), accid)
} }
gc.leaveRoom = func(chanid channelID, accid accountID) { gc.leaveRoom = func(chanid channelID, accid accountID) {
sub.wsh.LeaveRoom(sub.region, string(chanid), accid) tv.wsh.LeaveRoom(string(chanid), accid)
} }
gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) { gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
sub.wsh.SendUpstreamMessage(sub.region, msg) tv.wsh.SendUpstreamMessage(msg)
} }
gc.rh = sub.redison gc.rh = tv.redison
sub.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels) tv.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels)
sub.apiFuncs.registApiFunction("BroadcastMessageOnChannel", gc.BroadcastMessageOnChannel) tv.apiFuncs.registApiFunction("BroadcastMessageOnChannel", gc.BroadcastMessageOnChannel)
sub.apiFuncs.registApiFunction("QueryPlayerChattingChannel", gc.QueryPlayerChattingChannel) tv.apiFuncs.registApiFunction("QueryPlayerChattingChannel", gc.QueryPlayerChattingChannel)
sub.apiFuncs.registApiFunction("SendMessageOnChannel", gc.SendMessageOnChannel) tv.apiFuncs.registApiFunction("SendMessageOnChannel", gc.SendMessageOnChannel)
for name, cfg := range gc.chatConfig.Channels { for name, cfg := range gc.chatConfig.Channels {
if cfg.Capacity == 0 { if cfg.Capacity == 0 {
cfg.Capacity = gc.chatConfig.DefaultCapacity cfg.Capacity = gc.chatConfig.DefaultCapacity

View File

@ -255,33 +255,33 @@ type groupParty struct {
rh *gocommon.RedisonHandler rh *gocommon.RedisonHandler
} }
func (gp *groupParty) Initialize(sub *subTavern, cfg configDocument) error { func (gp *groupParty) Initialize(tv *Tavern, cfg configDocument) error {
rem, _ := json.Marshal(cfg) rem, _ := json.Marshal(cfg)
err := json.Unmarshal(rem, &gp.partyConfig) err := json.Unmarshal(rem, &gp.partyConfig)
if err != nil { if err != nil {
return err return err
} }
gp.rh = sub.redison gp.rh = tv.redison
gp.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) { gp.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
sub.wsh.SendUpstreamMessage(sub.region, msg) tv.wsh.SendUpstreamMessage(msg)
} }
gp.enterRoom = func(gid groupID, accid accountID) { gp.enterRoom = func(gid groupID, accid accountID) {
sub.wsh.EnterRoom(sub.region, gid.Hex(), accid) tv.wsh.EnterRoom(gid.Hex(), accid)
} }
gp.leaveRoom = func(gid groupID, accid accountID) { gp.leaveRoom = func(gid groupID, accid accountID) {
sub.wsh.LeaveRoom(sub.region, gid.Hex(), accid) tv.wsh.LeaveRoom(gid.Hex(), accid)
} }
sub.apiFuncs.registApiFunction("JoinParty", gp.JoinParty) tv.apiFuncs.registApiFunction("JoinParty", gp.JoinParty)
sub.apiFuncs.registApiFunction("InviteToParty", gp.InviteToParty) tv.apiFuncs.registApiFunction("InviteToParty", gp.InviteToParty)
sub.apiFuncs.registApiFunction("AcceptPartyInvitation", gp.AcceptPartyInvitation) tv.apiFuncs.registApiFunction("AcceptPartyInvitation", gp.AcceptPartyInvitation)
sub.apiFuncs.registApiFunction("DenyPartyInvitation", gp.DenyPartyInvitation) tv.apiFuncs.registApiFunction("DenyPartyInvitation", gp.DenyPartyInvitation)
sub.apiFuncs.registApiFunction("QueryPartyMemberState", gp.QueryPartyMemberState) tv.apiFuncs.registApiFunction("QueryPartyMemberState", gp.QueryPartyMemberState)
sub.apiFuncs.registApiFunction("LeaveParty", gp.LeaveParty) tv.apiFuncs.registApiFunction("LeaveParty", gp.LeaveParty)
sub.apiFuncs.registApiFunction("UpdatePartyMemberDocument", gp.UpdatePartyMemberDocument) tv.apiFuncs.registApiFunction("UpdatePartyMemberDocument", gp.UpdatePartyMemberDocument)
sub.apiFuncs.registApiFunction("UpdatePartyDocument", gp.UpdatePartyDocument) tv.apiFuncs.registApiFunction("UpdatePartyDocument", gp.UpdatePartyDocument)
sub.apiFuncs.registApiFunction("QueryPartyMembers", gp.QueryPartyMembers) tv.apiFuncs.registApiFunction("QueryPartyMembers", gp.QueryPartyMembers)
return nil return nil
} }

View File

@ -66,7 +66,10 @@ func readBsonDoc(r io.Reader, src any) error {
} }
type TavernConfig struct { type TavernConfig struct {
gocommon.RegionStorageConfig `json:",inline"` gocommon.StorageAddr `json:"storage"`
SessionTTL int64 `json:"maingate_session_ttl"`
SessionStorage string `json:"maingate_session_storage"`
Group map[string]configDocument `json:"tavern_group_types"` Group map[string]configDocument `json:"tavern_group_types"`
MaingateApiToken string `json:"maingate_api_token"` MaingateApiToken string `json:"maingate_api_token"`
@ -77,15 +80,9 @@ type TavernConfig struct {
var config TavernConfig var config TavernConfig
type Tavern struct { type Tavern struct {
subTaverns []*subTavern wsh *wshandler.WebsocketHandler
wsh *wshandler.WebsocketHandler
}
type subTavern struct {
mongoClient gocommon.MongoClient mongoClient gocommon.MongoClient
redison *gocommon.RedisonHandler redison *gocommon.RedisonHandler
wsh *wshandler.WebsocketHandler
region string
groups map[string]group groups map[string]group
apiFuncs *apiFuncsContainer apiFuncs *apiFuncsContainer
} }
@ -124,6 +121,10 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
config.macAddr = macaddr config.macAddr = macaddr
tv := &Tavern{ tv := &Tavern{
wsh: wsh, wsh: wsh,
apiFuncs: &apiFuncsContainer{
normfuncs: make(map[string]apiFuncType),
funcs: make(map[string][]apiFuncType),
},
} }
if err = tv.prepare(context); err != nil { if err = tv.prepare(context); err != nil {
@ -135,92 +136,65 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav
} }
func (tv *Tavern) Cleanup() { func (tv *Tavern) Cleanup() {
for _, st := range tv.subTaverns { tv.mongoClient.Close()
st.mongoClient.Close()
}
} }
func (tv *Tavern) prepare(ctx context.Context) error { func (tv *Tavern) prepare(ctx context.Context) error {
for region, addr := range config.RegionStorage { redisClient, err := gocommon.NewRedisClient(config.StorageAddr.Redis["tavern"])
var dbconn gocommon.MongoClient if err != nil {
var err error return err
}
redisClient, err := gocommon.NewRedisClient(addr.Redis["tavern"]) tv.redison = gocommon.NewRedisonHandler(redisClient.Context(), redisClient)
if err != nil {
groups := make(map[string]group)
for typename, cfg := range config.Group {
gtype, ok := groupTypeContainer()[typename]
if !ok {
return fmt.Errorf("%s group type is not valid", typename)
}
if !gtype.Implements(reflect.TypeOf((*group)(nil)).Elem()) {
return fmt.Errorf("%s is not implement proper interface", typename)
}
ptrvalue := reflect.New(gtype.Elem())
instance := ptrvalue.Interface().(group)
if err := instance.Initialize(tv, cfg); err != nil {
return err return err
} }
groups[typename] = instance
redison := gocommon.NewRedisonHandler(redisClient.Context(), redisClient)
sub := &subTavern{
wsh: tv.wsh,
mongoClient: dbconn,
redison: redison,
region: region,
apiFuncs: &apiFuncsContainer{
normfuncs: make(map[string]apiFuncType),
funcs: make(map[string][]apiFuncType),
},
}
groups := make(map[string]group)
for typename, cfg := range config.Group {
gtype, ok := groupTypeContainer()[typename]
if !ok {
return fmt.Errorf("%s group type is not valid", typename)
}
if !gtype.Implements(reflect.TypeOf((*group)(nil)).Elem()) {
return fmt.Errorf("%s is not implement proper interface", typename)
}
ptrvalue := reflect.New(gtype.Elem())
instance := ptrvalue.Interface().(group)
if err := instance.Initialize(sub, cfg); err != nil {
return err
}
groups[typename] = instance
}
sub.groups = groups
sub.apiFuncs.normalize()
tv.subTaverns = append(tv.subTaverns, sub)
} }
tv.groups = groups
tv.apiFuncs.normalize()
return nil return nil
} }
func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
for _, sub := range tv.subTaverns { tv.wsh.RegisterReceiver(tv)
tv.wsh.RegisterReceiver(sub.region, sub) pattern := gocommon.MakeHttpHandlerPattern(prefix, "api")
var pattern string serveMux.HandleFunc(pattern, tv.api)
if sub.region == "default" {
pattern = gocommon.MakeHttpHandlerPattern(prefix, "api")
} else {
pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api")
}
serveMux.HandleFunc(pattern, sub.api)
}
return nil return nil
} }
func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) { func (tv *Tavern) OnClientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
if messageType == wshandler.Connected { if messageType == wshandler.Connected {
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex()) logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
sub.redison.HSet(sub.redison.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result() tv.redison.HSet(tv.redison.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result()
for _, gt := range sub.groups { for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, nil) gt.ClientMessageReceived(sender, messageType, nil)
} }
} else if messageType == wshandler.Disconnected { } else if messageType == wshandler.Disconnected {
var rooms []string var rooms []string
dec := json.NewDecoder(body) dec := json.NewDecoder(body)
if err := dec.Decode(&rooms); err == nil { if err := dec.Decode(&rooms); err == nil {
for _, gt := range sub.groups { for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, rooms) gt.ClientMessageReceived(sender, messageType, rooms)
} }
} }
sub.redison.Del(sub.redison.Context(), sender.Accid.Hex()).Result() tv.redison.Del(tv.redison.Context(), sender.Accid.Hex()).Result()
logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex()) logger.Println("OnClientMessageReceived : disconnected ", sender.Accid.Hex())
} else if messageType == wshandler.BinaryMessage { } else if messageType == wshandler.BinaryMessage {
var commandline []any var commandline []any
@ -230,13 +204,13 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
args := commandline[1:] args := commandline[1:]
switch cmd { switch cmd {
case "EnterChannel": case "EnterChannel":
sub.wsh.EnterRoom(sub.region, args[0].(string), sender.Accid) tv.wsh.EnterRoom(args[0].(string), sender.Accid)
case "LeaveChannel": case "LeaveChannel":
sub.wsh.LeaveRoom(sub.region, args[0].(string), sender.Accid) tv.wsh.LeaveRoom(args[0].(string), sender.Accid)
default: default:
for _, gt := range sub.groups { for _, gt := range tv.groups {
gt.ClientMessageReceived(sender, messageType, commandline) gt.ClientMessageReceived(sender, messageType, commandline)
} }
} }
@ -244,29 +218,29 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
} }
} }
func (sub *subTavern) OnRoomCreated(region, name string) { func (tv *Tavern) OnRoomCreated(name string) {
cnt, err := sub.redison.IncrBy(sub.redison.Context(), "_ref_"+name, 1).Result() cnt, err := tv.redison.IncrBy(tv.redison.Context(), "_ref_"+name, 1).Result()
if err != nil && !errors.Is(err, redis.Nil) { if err != nil && !errors.Is(err, redis.Nil) {
logger.Println("OnRoomCreated JSONSet failed :", err) logger.Println("OnRoomCreated JSONSet failed :", err)
return return
} }
if cnt == 1 { if cnt == 1 {
sub.redison.JSONSet(name, "$", map[string]any{}, gocommon.RedisonSetOptionNX) tv.redison.JSONSet(name, "$", map[string]any{}, gocommon.RedisonSetOptionNX)
} }
} }
func (sub *subTavern) OnRoomDestroyed(region, name string) { func (tv *Tavern) OnRoomDestroyed(name string) {
cnt, err := sub.redison.IncrBy(sub.redison.Context(), "_ref_"+name, -1).Result() cnt, err := tv.redison.IncrBy(tv.redison.Context(), "_ref_"+name, -1).Result()
if err != nil { if err != nil {
logger.Println("OnRoomDestroyed JSONNumIncrBy failed :", err) logger.Println("OnRoomDestroyed JSONNumIncrBy failed :", err)
} else if cnt == 0 { } else if cnt == 0 {
sub.redison.Del(sub.redison.Context(), "_ref_"+name) tv.redison.Del(tv.redison.Context(), "_ref_"+name)
sub.redison.JSONDel(name, "$") tv.redison.JSONDel(name, "$")
} }
} }
func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { func (tv *Tavern) api(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
s := recover() s := recover()
if s != nil { if s != nil {
@ -294,5 +268,5 @@ func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) {
return return
} }
sub.apiFuncs.call(operation, w, r) tv.apiFuncs.call(operation, w, r)
} }

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.20
require ( require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
go.mongodb.org/mongo-driver v1.11.7 go.mongodb.org/mongo-driver v1.11.7
repositories.action2quare.com/ayo/gocommon v0.0.0-20230823140214-6bae78a282fd repositories.action2quare.com/ayo/gocommon v0.0.0-20230831115717-58de9a3f0cc2
) )
require ( require (

6
go.sum
View File

@ -144,3 +144,9 @@ repositories.action2quare.com/ayo/gocommon v0.0.0-20230823134414-400c7f644333 h1
repositories.action2quare.com/ayo/gocommon v0.0.0-20230823134414-400c7f644333/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= repositories.action2quare.com/ayo/gocommon v0.0.0-20230823134414-400c7f644333/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230823140214-6bae78a282fd h1:ie8yHPe1PlWRDxZJb9VSKJiw+r7yjCt4mZM8GWcOzDs= repositories.action2quare.com/ayo/gocommon v0.0.0-20230823140214-6bae78a282fd h1:ie8yHPe1PlWRDxZJb9VSKJiw+r7yjCt4mZM8GWcOzDs=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230823140214-6bae78a282fd/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw= repositories.action2quare.com/ayo/gocommon v0.0.0-20230823140214-6bae78a282fd/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831053308-cde46e6a5fdb h1:F7BxLeUeJoBnE+5VCMuKimceSYmhdH2dQSzmyBzc4+M=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831053308-cde46e6a5fdb/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831113900-57b518562eac h1:LFWGF8pTmYujUil9RNyRcZP+lY+w54na0NfSDLGiz6U=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831113900-57b518562eac/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831115717-58de9a3f0cc2 h1:/StJeWqAY94RJ5SWgShQEsHCsfi0zX4IAAS+qLSocJc=
repositories.action2quare.com/ayo/gocommon v0.0.0-20230831115717-58de9a3f0cc2/go.mod h1:PdpZ16O1czKKxCxn+0AFNaEX/0kssYwC3G8jR0V7ybw=

23
main.go
View File

@ -4,13 +4,15 @@ package main
import ( import (
"context" "context"
"net/http" "net/http"
"time"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/wshandler" "repositories.action2quare.com/ayo/gocommon/wshandler"
"repositories.action2quare.com/ayo/tavern/core" "repositories.action2quare.com/ayo/tavern/core"
common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session"
) )
var prefix = flagx.String("prefix", "", "") var prefix = flagx.String("prefix", "", "")
@ -20,11 +22,24 @@ func main() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var config core.TavernConfig var config core.TavernConfig
if err := common.LoadConfig(&config); err != nil { if err := gocommon.LoadConfig(&config); err != nil {
panic(err) panic(err)
} }
wsh, err := wshandler.NewWebsocketHandler() if len(config.SessionStorage) == 0 {
panic("maingate_session_storage is missing")
}
if config.SessionTTL == 0 {
config.SessionTTL = 3600
}
consumer, err := session.NewConsumer(ctx, config.SessionStorage, time.Duration(config.SessionTTL)*time.Second)
if err != nil {
panic(err)
}
wsh, err := wshandler.NewWebsocketHandler(consumer)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -35,7 +50,7 @@ func main() {
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
wsh.RegisterHandlers(serveMux, *prefix) wsh.RegisterHandlers(serveMux, *prefix)
tv.RegisterHandlers(ctx, serveMux, *prefix) tv.RegisterHandlers(ctx, serveMux, *prefix)
server := common.NewHTTPServer(serveMux) server := gocommon.NewHTTPServer(serveMux)
logger.Println("tavern is started") logger.Println("tavern is started")
wsh.Start(ctx) wsh.Start(ctx)
server.Start() server.Start()