채팅 채널 입장 추가

This commit is contained in:
2023-07-27 17:45:51 +09:00
parent 310397dd2b
commit 90d0fd319d
7 changed files with 280 additions and 27 deletions

View File

@ -1 +0,0 @@
{}

View File

@ -51,5 +51,5 @@ func (afc *apiFuncsContainer) call(fn string, w http.ResponseWriter, r *http.Req
type configDocument map[string]any
type group interface {
Initialize(*subTavern, configDocument) error
ClientMessageReceved(*wshandler.Sender, wshandler.WebSocketMessageType, any)
ClientMessageReceived(*wshandler.Sender, wshandler.WebSocketMessageType, any)
}

View File

@ -1,8 +1,12 @@
package core
import (
"encoding/json"
"net/http"
"reflect"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
@ -10,13 +14,209 @@ func init() {
groupTypeContainer()["chat"] = reflect.TypeOf(&groupChat{})
}
type groupChat struct {
type channelID = string
type chatConfig struct {
DefaultCapacity int64 `json:"default_capacity"`
Channels map[string]map[string]any `json:"channels"`
}
func (gc *groupChat) Initialize(*subTavern, configDocument) error {
type groupChat struct {
chatConfig
rh *gocommon.RedisonHandler
incSizeScript string
decSizeScript string
enterRoom func(channelID, accountID)
leaveRoom func(channelID, accountID)
sendUpstreamMessage func(msg *wshandler.UpstreamMessage)
}
var increaseSizeScript = `
local cap = redis.call("HGET", KEYS[1], "capacity")
local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1)
local newsize = redis.call("HINCRBY", KEYS[1], "size", 1)
if tonumber(cap) < newsize then
redis.call("HINCRBY", KEYS[1], "size", -1)
return {err = "channel is full"}
end
redis.call("HSET", "_m_"..KEYS[1], KEYS[2], ARGV[1])
return {newsize, newseq}
`
var decreaseSizeScript = `
local newseq = redis.call("HINCRBY", KEYS[1], "seq", 1)
local newsize = redis.call("HINCRBY", KEYS[1], "size", -1)
redis.call("HDEL", "_m_"..KEYS[1], KEYS[2])
return {newsize, newseq}
`
func (gc *groupChat) Initialize(sub *subTavern, cfg configDocument) error {
incScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), increaseSizeScript).Result()
if err != nil {
return err
}
decScript, err := sub.redisClient.ScriptLoad(sub.redisClient.Context(), decreaseSizeScript).Result()
if err != nil {
return err
}
// newsize, err := sub.redisClient.EvalSha(sub.redisClient.Context(), incScript, []string{"myhash", "alias"}, "accid").Result()
// if err != nil {
// return err
// }
// logger.Println(newsize.([]any))
// newsize, err = sub.redisClient.EvalSha(sub.redisClient.Context(), decScript, []string{"myhash", "alias"}).Result()
// if err != nil {
// return err
// }
// logger.Println(newsize.([]any))
rem, _ := json.Marshal(cfg)
if err = json.Unmarshal(rem, &gc.chatConfig); err != nil {
return err
}
gc.enterRoom = func(chanid channelID, accid accountID) {
sub.wsh.EnterRoom(sub.region, string(chanid), accid)
}
gc.leaveRoom = func(chanid channelID, accid accountID) {
sub.wsh.LeaveRoom(sub.region, string(chanid), accid)
}
gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
sub.wsh.SendUpstreamMessage(sub.region, msg)
}
gc.rh = gocommon.NewRedisonHandler(sub.redisClient.Context(), sub.redisClient)
gc.incSizeScript = incScript
gc.decSizeScript = decScript
sub.apiFuncs.registApiFunction("CreateChattingChannel", gc.CreateChattingChannel)
sub.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels)
for name, cfg := range gc.chatConfig.Channels {
if _, ok := cfg["capacity"]; !ok {
cfg["capacity"] = gc.chatConfig.DefaultCapacity
}
cfg["key"] = name
_, err := gc.rh.HMSet(gc.rh.Context(), name, cfg).Result()
if err != nil {
return err
}
}
return nil
}
func (gc *groupChat) ClientMessageReceved(*wshandler.Sender, wshandler.WebSocketMessageType, any) {
func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Disconnected {
rooms := message.([]string)
for _, chanid := range rooms {
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result()
if err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Tag: []string{"ChattingChannelProperties"},
})
}
}
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "EnterChattingChannel":
chanid := args[0].(string)
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.incSizeScript, []string{chanid, sender.Alias}, sender.Accid.Hex()).Result()
if err == nil {
gc.enterRoom(chanid, sender.Accid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Tag: []string{"ChattingChannelProperties"},
})
} else {
// 입장 실패 알림
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + sender.Accid.Hex(),
Body: map[string]string{"id": chanid, "err": err.Error()},
Tag: []string{"EnterChattingChannelFailed"},
})
}
case "LeaveChattingChannel":
chanid := args[0].(string)
gc.leaveRoom(chanid, sender.Accid)
sizeseq, err := gc.rh.EvalSha(gc.rh.Context(), gc.decSizeScript, []string{chanid, sender.Alias}).Result()
if err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": sizeseq.([]any)[0], "seq": sizeseq.([]any)[1]},
Tag: []string{"ChattingChannelProperties"},
})
}
case "TextMessage":
chanid := args[0].(string)
msg := args[1].(string)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": sender.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
}
}
}
func (gc *groupChat) CreateChattingChannel(w http.ResponseWriter, r *http.Request) {
chanstr, _ := gocommon.ReadStringFormValue(r.Form, "name")
if len(chanstr) == 0 {
logger.Println("CreateChattingChannel failed. name is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
capacity, _ := gocommon.ReadIntegerFormValue(r.Form, "capacity")
if capacity == 0 {
capacity = gc.chatConfig.DefaultCapacity
}
_, err := gc.rh.HSetNX(gc.rh.Context(), chanstr, "capacity", capacity).Result()
if err != nil {
logger.Println("CreateChattingChannel failed. HSetNX returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) {
prefix, _ := gocommon.ReadStringFormValue(r.Form, "prefix")
if len(prefix) == 0 {
logger.Println("FetchChattingChannel failed. prefix is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
keys, err := gc.rh.Keys(gc.rh.Context(), prefix+"*").Result()
if err != nil {
logger.Println("FetchChattingChannel failed. Keys return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
var channels []map[string]string
for _, key := range keys {
onechan, err := gc.rh.HGetAll(gc.rh.Context(), key).Result()
if err != nil {
logger.Println("FetchChattingChannel failed. HGetAll return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
channels = append(channels, onechan)
}
enc := json.NewEncoder(w)
enc.Encode(channels)
}

View File

@ -221,8 +221,8 @@ type partyConfig struct {
type groupParty struct {
partyConfig
sendUpstreamMessage func(*wshandler.UpstreamMessage)
sendEnterRoomMessage func(groupID, accountID)
sendLeaveRoomMessage func(groupID, accountID)
enterRoom func(groupID, accountID)
leaveRoom func(groupID, accountID)
rh *gocommon.RedisonHandler
}
@ -237,10 +237,10 @@ func (gp *groupParty) Initialize(sub *subTavern, cfg configDocument) error {
gp.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
sub.wsh.SendUpstreamMessage(sub.region, msg)
}
gp.sendEnterRoomMessage = func(gid groupID, accid accountID) {
gp.enterRoom = func(gid groupID, accid accountID) {
sub.wsh.EnterRoom(sub.region, gid.Hex(), accid)
}
gp.sendLeaveRoomMessage = func(gid groupID, accid accountID) {
gp.leaveRoom = func(gid groupID, accid accountID) {
sub.wsh.LeaveRoom(sub.region, gid.Hex(), accid)
}
@ -309,7 +309,7 @@ func (gp *groupParty) JoinParty(w http.ResponseWriter, r *http.Request) {
Tag: []string{"MemberDocFull"},
})
gp.sendEnterRoomMessage(gid, mid)
gp.enterRoom(gid, mid)
// 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
@ -398,7 +398,7 @@ func (gp *groupParty) InviteToParty(w http.ResponseWriter, r *http.Request) {
return
}
// 내가 wshandler room에 입장
gp.sendEnterRoomMessage(gid, mid)
gp.enterRoom(gid, mid)
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
@ -469,7 +469,7 @@ func (gp *groupParty) AcceptPartyInvitation(w http.ResponseWriter, r *http.Reque
Tag: []string{"MemberDocFull"},
})
gp.sendEnterRoomMessage(gid, mid)
gp.enterRoom(gid, mid)
// 새 멤버에 그룹 전체를 알림
gp.sendUpstreamMessage(&wshandler.UpstreamMessage{
@ -503,8 +503,13 @@ func (gp *groupParty) QueryPartyMemberState(w http.ResponseWriter, r *http.Reque
return
}
state, _ := gp.rh.HGet(gp.rh.Context(), mid, "party_state").Result()
w.Write([]byte(state))
states, _ := gp.rh.HMGet(gp.rh.Context(), mid, "party_state", "_ts").Result()
if states[0] != nil && len(states[0].(string)) > 0 {
w.Write([]byte(states[0].(string)))
} else if states[1] != nil && len(states[1].(string)) > 0 {
w.Write([]byte("connected"))
}
}
// LeaveParty : 그룹에서 나감 or 내보냄
@ -541,7 +546,7 @@ func (gp *groupParty) LeaveParty(w http.ResponseWriter, r *http.Request) {
Body: bson.M{"gid": gid},
Tag: []string{"GroupDocFull", gid.Hex()},
})
gp.sendLeaveRoomMessage(gid, mid)
gp.leaveRoom(gid, mid)
}
func (gp *groupParty) updateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
@ -731,15 +736,12 @@ func (gp *groupParty) memberDisconnected(room string, mid primitive.ObjectID) {
}
}
func (gp *groupParty) ClientMessageReceved(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Connected {
gp.rh.HSet(gp.rh.Context(), sender.Accid.Hex(), "party_state", "connected").Result()
} else if mt == wshandler.Disconnected {
func (gp *groupParty) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Disconnected {
rooms := message.([]string)
for _, roomname := range rooms {
gp.memberDisconnected(roomname, sender.Accid)
}
gp.rh.HDel(gp.rh.Context(), sender.Accid.Hex(), "party_state").Result()
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)

View File

@ -204,16 +204,15 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
if messageType == wshandler.Connected {
logger.Println("OnClientMessageReceived : connected ", sender.Accid.Hex())
sub.redisClient.HSet(sub.redisClient.Context(), sender.Accid.Hex(), "_ts", time.Now().UTC().Unix()).Result()
for _, gt := range sub.groups {
gt.ClientMessageReceved(sender, messageType, nil)
gt.ClientMessageReceived(sender, messageType, nil)
}
} else if messageType == wshandler.Disconnected {
var rooms []string
dec := json.NewDecoder(body)
if err := dec.Decode(&rooms); err == nil {
for _, gt := range sub.groups {
gt.ClientMessageReceved(sender, messageType, rooms)
gt.ClientMessageReceived(sender, messageType, rooms)
}
}
sub.redisClient.Del(sub.redisClient.Context(), sender.Accid.Hex()).Result()
@ -233,7 +232,7 @@ func (sub *subTavern) OnClientMessageReceived(sender *wshandler.Sender, messageT
default:
for _, gt := range sub.groups {
gt.ClientMessageReceved(sender, messageType, commandline)
gt.ClientMessageReceived(sender, messageType, commandline)
}
}
}
@ -250,7 +249,7 @@ func (sub *subTavern) OnRoomCreated(region, name string) {
func (sub *subTavern) OnRoomDestroyed(region, name string) {
_, err := sub.redisClient.Expire(context.Background(), name, 3600*time.Second).Result()
if err != nil {
logger.Println("OnRoomDestroyed Persist failed :", err)
logger.Println("OnRoomDestroyed Expire failed :", err)
}
}

View File

@ -3,6 +3,7 @@ package core
import (
"context"
"encoding/binary"
"fmt"
"testing"
"time"
@ -30,11 +31,46 @@ func TestPubSub(t *testing.T) {
msg, err := pubsub.ReceiveMessage(context.Background())
fmt.Println(msg.Payload, err)
}
func makeHash(chanName string, index uint32) string {
for len(chanName) < 12 {
chanName += chanName
}
left := chanName[:6]
right := chanName[len(chanName)-6:]
base := []byte(left + right)
for i := 0; i < 12; i++ {
base[i] += base[12-i-1]
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, index)
for i, c := range bs {
base[i] ^= c
}
var gid primitive.ObjectID
copy(gid[:], base)
return gid.Hex()
}
func TestNameHash(t *testing.T) {
for i := 0; i < 10; i++ {
makeHash("Urud", uint32(i))
fmt.Printf("Urud:%d - %s\n", i, makeHash("Urud", uint32(i)))
makeHash("Sheldon", uint32(i))
fmt.Printf("Sheldon:%d - %s\n", i, makeHash("Sheldon", uint32(i)))
}
}
func TestReJSON(t *testing.T) {
rc := redis.NewClient(&redis.Options{Addr: "192.168.8.94:6380"})
rh := gocommon.NewRedisonHandler(context.Background(), rc)
success, err := rc.HSetNX(context.Background(), "setnxtest", "cap", 100).Result()
fmt.Println(success, err)
success, err = rc.HSetNX(context.Background(), "setnxtest", "cap", 100).Result()
fmt.Println(success, err)
testDoc := map[string]any{
"members": map[string]any{
"mid2": map[string]any{
@ -64,7 +100,7 @@ func TestReJSON(t *testing.T) {
logger.Println(rh.JSONGetInt64("jsontest", "$.members..exp"))
logger.Println(rh.JSONObjKeys("jsontest", "$.members"))
err := rh.JSONMSet("jsontest", map[string]any{
err = rh.JSONMSet("jsontest", map[string]any{
"$.members.mid1.key": "newval",
"$.members.mid2.key": "newval",
})