Files
tavern/core/group_chat.go

311 lines
8.6 KiB
Go

package core
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
type channelID = string
type channelConfig struct {
Capacity int64 `json:"capacity"`
Size int64 `json:"size"`
Key string `json:"key"`
Members map[string]int32 `json:"members"`
emptyJson string
inoutChan chan string
}
type chatConfig struct {
DefaultCapacity int64 `json:"default_capacity"`
Channels map[string]*channelConfig `json:"channels"`
}
type groupChat struct {
chatConfig
rh *gocommon.RedisonHandler
enterRoom func(channelID, accountID)
leaveRoom func(channelID, accountID)
sendUpstreamMessage func(msg *wshandler.UpstreamMessage)
}
func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
rem, _ := json.Marshal(cfg)
if err := json.Unmarshal(rem, &gc.chatConfig); err != nil {
return err
}
gc.enterRoom = func(chanid channelID, accid accountID) {
tv.wsh.EnterRoom(string(chanid), accid)
}
gc.leaveRoom = func(chanid channelID, accid accountID) {
tv.wsh.LeaveRoom(string(chanid), accid)
}
gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
tv.wsh.SendUpstreamMessage(msg)
}
gc.rh = tv.redison
for name, cfg := range gc.chatConfig.Channels {
if cfg.Capacity == 0 {
cfg.Capacity = gc.chatConfig.DefaultCapacity
}
cfg.Key = name
cfg.Size = 0
jm, _ := json.Marshal(cfg)
cfg.emptyJson = fmt.Sprintf("[%s]", string(jm))
cfg.Members = make(map[string]int32)
_, err := gc.rh.JSONSet(name, "$", cfg)
if *devflag && err != nil {
gc.rh.JSONDel(name, "$")
_, err = gc.rh.JSONSet(name, "$", cfg)
}
if err != nil {
return err
}
inoutchan := make(chan string, 10)
cfg.inoutChan = inoutchan
go func(chanid string) {
var cur []string
tick := time.After(3 * time.Second)
for {
select {
case <-tick:
tick = time.After(3 * time.Second)
if len(cur) > 0 {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"inout": cur},
Tag: []string{"ChattingChannelProperties"},
})
cur = nil
}
case m := <-inoutchan:
cur = append(cur, m)
}
}
}(name)
}
return nil
}
func (gc *groupChat) ClientConnected(conn *websocket.Conn, callby *wshandler.Sender) {
gc.rh.JSONSet(callby.Accid.Hex(), "$.channel", map[string]any{})
}
func (gc *groupChat) ClientDisconnected(msg string, callby *wshandler.Sender) {
docs, _ := gc.rh.JSONGetDocuments(callby.Accid.Hex(), "$.channel")
if len(docs) > 0 {
for k, v := range docs[0] {
typename := k
chanid := v.(string)
gc.leaveRoom(chanid, callby.Accid)
if k == "public" {
gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
cfg.inoutChan <- "-" + callby.Alias
}
} else {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": callby.Alias},
Tag: []string{typename + ".LeavePrivateChannel"},
})
}
}
}
}
func (gc *groupChat) EnterPublicChannel(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
size, err := gc.rh.JSONObjLen(chanid, "$.members")
if err != nil {
logger.Println("JSONGetInt64 failed :", chanid, err)
} else if len(size) == 0 || size[0] < cfg.Capacity {
// 입장
_, err := gc.rh.JSONSet(chanid, "$.members."+ctx.CallBy.Alias, 1)
if err == nil {
gc.enterRoom(chanid, ctx.CallBy.Accid)
gc.rh.JSONSet(ctx.CallBy.Accid.Hex(), "$.channel.public", chanid)
cfg.inoutChan <- "+" + ctx.CallBy.Alias
members, err := gc.rh.JSONGetDocuments(chanid, "$.members")
if err != nil {
logger.Println("JSONGetDocuments failed :", chanid, err)
}
toarr := make([]string, 0, len(members[0]))
for k := range members[0] {
toarr = append(toarr, k)
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: ctx.CallBy.Accid.Hex(),
Body: map[string]any{"members": toarr},
Tag: []string{"ChattingChannelProperties"},
})
} else {
logger.Println("JSONSet $.members failed :", chanid, *ctx.CallBy, err)
logger.Println(gc.rh.JSONGet(chanid, "$"))
}
} else {
// 풀방
logger.Println("chatting channel is full :", chanid, size, cfg.Capacity)
}
} else {
logger.Println("chatting channel not valid :", chanid)
}
}
func (gc *groupChat) LeavePublicChannel(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
cnt, _ := gc.rh.JSONDel(ctx.CallBy.Accid.Hex(), "$.channel.public")
if cnt > 0 {
gc.leaveRoom(chanid, ctx.CallBy.Accid)
gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
cfg.inoutChan <- "-" + ctx.CallBy.Alias
}
}
}
func (gc *groupChat) TextMessage(ctx wshandler.ApiCallContext) {
chanid := ctx.Arguments[0].(string)
msg := ctx.Arguments[1].(string)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": ctx.CallBy.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
}
func (gc *groupChat) EnterPrivateChannel(ctx wshandler.ApiCallContext) {
typename := ctx.Arguments[0].(string)
channel := ctx.Arguments[1].(string)
var reason string
if len(ctx.Arguments) > 2 {
reason = ctx.Arguments[2].(string)
}
if len(reason) > 0 {
// 수락
ok, err := gc.rh.JSONSet(ctx.CallBy.Accid.Hex(), "$.channel."+typename, channel, gocommon.RedisonSetOptionNX)
if err != nil || !ok {
// 이미 다른 private channel 참여 중
logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, ctx.CallBy.Accid.Hex(), typename, channel)
return
}
gc.enterRoom(channel, ctx.CallBy.Accid)
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{
"sender": ctx.CallBy.Alias,
"msg": reason,
},
Tag: []string{typename + ".EnterPrivateChannel"},
})
}
func (gc *groupChat) LeavePrivateChannel(ctx wshandler.ApiCallContext) {
typename := ctx.Arguments[0].(string)
chanid := ctx.Arguments[1].(string)
cnt, _ := gc.rh.JSONDel(ctx.CallBy.Accid.Hex(), "$.channel."+typename)
if cnt > 0 {
gc.leaveRoom(chanid, ctx.CallBy.Accid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": ctx.CallBy.Alias},
Tag: []string{typename + ".LeavePrivateChannel"},
})
}
}
func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) {
var prefix string
if err := gocommon.MakeDecoder(r).Decode(&prefix); err != nil {
logger.Println("FetchChattingChannels failed. ReadJsonDocumentFromBody returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(prefix) == 0 {
logger.Println("FetchChattingChannel failed. prefix is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
var rows []string
for name, cfg := range gc.chatConfig.Channels {
if len(prefix) > 0 {
if !strings.HasPrefix(name, prefix) {
continue
}
}
onechan, err := gc.rh.JSONGet(name, "$")
if err != nil && err != redis.Nil {
logger.Println("FetchChattingChannel failed. HGetAll return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if err == redis.Nil || onechan == nil {
rows = append(rows, cfg.emptyJson)
} else {
// json array로 나온다
rows = append(rows, strings.Trim(onechan.(string), "[]"))
}
}
gocommon.MakeEncoder(w, r).Encode(rows)
}
func (gc *groupChat) QueryPlayerChattingChannel(w http.ResponseWriter, r *http.Request) {
var accid primitive.ObjectID
if err := gocommon.MakeDecoder(r).Decode(&accid); err != nil {
logger.Println("QueryPlayerChattingChannel failed. ReadJsonDocumentFromBody returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
sub, err := gc.rh.JSONGetDocuments(accid.Hex(), "$.channel")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(sub) > 0 {
gocommon.MakeEncoder(w, r).Encode(sub[0])
}
}
func (gc *groupChat) SendMessageOnChannel(w http.ResponseWriter, r *http.Request) {
var msg wshandler.UpstreamMessage
if err := gocommon.MakeDecoder(r).Decode(&msg); err != nil {
logger.Println("SendMessageOnChannel failed. ReadJsonDocumentFromBody return err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
gc.sendUpstreamMessage(&msg)
}