wshandler peer 추가
This commit is contained in:
187
wshandler/api_handler_peer.go
Normal file
187
wshandler/api_handler_peer.go
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
package wshandler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type peerApiFuncType func(any, []byte) (any, error)
|
||||||
|
type peerConnFuncType func(any, *websocket.Conn)
|
||||||
|
type peerDisconnFuncType func(any, string)
|
||||||
|
|
||||||
|
type WebsocketPeerApiHandler struct {
|
||||||
|
methods map[string]peerApiFuncType
|
||||||
|
connfunc peerConnFuncType
|
||||||
|
disconnfunc peerDisconnFuncType
|
||||||
|
originalReceiverName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHandler {
|
||||||
|
methods := make(map[string]peerApiFuncType)
|
||||||
|
|
||||||
|
var archetype *T
|
||||||
|
tp := reflect.TypeOf(archetype)
|
||||||
|
if len(receiverName) == 0 {
|
||||||
|
receiverName = tp.Elem().Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
var connfunc peerConnFuncType
|
||||||
|
var disconnfunc peerDisconnFuncType
|
||||||
|
|
||||||
|
for i := 0; i < tp.NumMethod(); i++ {
|
||||||
|
method := tp.Method(i)
|
||||||
|
if method.Type.In(0) != tp {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if method.Name == ClientConnected {
|
||||||
|
if method.Type.NumIn() != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
funcptr := method.Func.Pointer()
|
||||||
|
p1 := unsafe.Pointer(&funcptr)
|
||||||
|
p2 := unsafe.Pointer(&p1)
|
||||||
|
connfuncptr := (*func(*T, *websocket.Conn))(p2)
|
||||||
|
connfunc = func(r any, c *websocket.Conn) {
|
||||||
|
(*connfuncptr)(r.(*T), c)
|
||||||
|
}
|
||||||
|
} else if method.Name == ClientDisconnected {
|
||||||
|
if method.Type.NumIn() != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if method.Type.In(1) != reflect.TypeOf("") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
funcptr := method.Func.Pointer()
|
||||||
|
p1 := unsafe.Pointer(&funcptr)
|
||||||
|
p2 := unsafe.Pointer(&p1)
|
||||||
|
disconnfuncptr := (*func(*T, string))(p2)
|
||||||
|
disconnfunc = func(r any, msg string) {
|
||||||
|
(*disconnfuncptr)(r.(*T), msg)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var intypes []reflect.Type
|
||||||
|
for i := 1; i < method.Type.NumIn(); i++ {
|
||||||
|
intypes = append(intypes, method.Type.In(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
var outconv func([]reflect.Value) (any, error)
|
||||||
|
if method.Type.NumOut() == 0 {
|
||||||
|
outconv = func([]reflect.Value) (any, error) { return nil, nil }
|
||||||
|
} else if method.Type.NumOut() == 1 {
|
||||||
|
if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||||||
|
outconv = func(out []reflect.Value) (any, error) {
|
||||||
|
if out[0].Interface() == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, out[0].Interface().(error)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
outconv = func(out []reflect.Value) (any, error) {
|
||||||
|
return out[0].Interface(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||||||
|
outconv = func(out []reflect.Value) (any, error) {
|
||||||
|
if out[1].Interface() == nil {
|
||||||
|
return out[0].Interface(), nil
|
||||||
|
}
|
||||||
|
return out[0].Interface(), out[1].Interface().(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
methods[receiverName+"."+method.Name] = func(recv any, buff []byte) (any, error) {
|
||||||
|
decoder := json.NewDecoder(bytes.NewBuffer(buff))
|
||||||
|
inargs := make([]any, len(intypes))
|
||||||
|
|
||||||
|
for i, intype := range intypes {
|
||||||
|
zerovalueptr := reflect.New(intype)
|
||||||
|
inargs[i] = zerovalueptr.Interface()
|
||||||
|
}
|
||||||
|
|
||||||
|
err := decoder.Decode(&inargs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reflectargs := make([]reflect.Value, 0, len(inargs)+1)
|
||||||
|
reflectargs = append(reflectargs, reflect.ValueOf(recv))
|
||||||
|
for _, p := range inargs {
|
||||||
|
reflectargs = append(reflectargs, reflect.ValueOf(p).Elem())
|
||||||
|
}
|
||||||
|
|
||||||
|
return outconv(method.Func.Call(reflectargs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return WebsocketPeerApiHandler{
|
||||||
|
methods: methods,
|
||||||
|
connfunc: connfunc,
|
||||||
|
disconnfunc: disconnfunc,
|
||||||
|
originalReceiverName: tp.Elem().Name(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type WebsocketPeerApiBroker struct {
|
||||||
|
methods map[string]peerApiFuncType
|
||||||
|
connFuncs []peerConnFuncType
|
||||||
|
disconnFuncs []peerDisconnFuncType
|
||||||
|
CreatePeer func(primitive.ObjectID) any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) {
|
||||||
|
if hc.methods == nil {
|
||||||
|
hc.methods = make(map[string]peerApiFuncType)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range receiver.methods {
|
||||||
|
ab := strings.Split(k, ".")
|
||||||
|
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
|
||||||
|
hc.methods[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if receiver.connfunc != nil {
|
||||||
|
logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName)
|
||||||
|
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if receiver.disconnfunc != nil {
|
||||||
|
// disconnfunc은 역순
|
||||||
|
logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName)
|
||||||
|
hc.disconnFuncs = append([]peerDisconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *wsconn) {
|
||||||
|
for _, v := range hc.connFuncs {
|
||||||
|
v(recv, c.Conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, c *wsconn) {
|
||||||
|
for _, v := range hc.disconnFuncs {
|
||||||
|
v(recv, c.closeMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, buff []byte) {
|
||||||
|
if found := hc.methods[funcname]; found != nil {
|
||||||
|
_, err := found(recv, buff)
|
||||||
|
if err != nil {
|
||||||
|
logger.Println("api call is failed. err :", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.Println("api is not found :", funcname)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,8 +2,14 @@
|
|||||||
package wshandler
|
package wshandler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/session"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestReceiver struct {
|
type TestReceiver struct {
|
||||||
@ -17,7 +23,32 @@ func (tr *TestReceiver) Func2(args []any) {
|
|||||||
fmt.Println(args...)
|
fmt.Println(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExpTable(t *testing.T) {
|
func TestUnmarshalToken(t *testing.T) {
|
||||||
|
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7
|
||||||
|
// 012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
|
||||||
|
insrc := []byte(`["string value 1",200,["inner string value 1","inner string value 2"],{"inner map key 1":"inner map string value 1","inner map key 2":"inner map string value 2"},500.1]`)
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(insrc))
|
||||||
|
dec.Token()
|
||||||
|
|
||||||
|
for {
|
||||||
|
token_start := dec.InputOffset()
|
||||||
|
tok, _ := dec.Token()
|
||||||
|
token_end := dec.InputOffset()
|
||||||
|
|
||||||
|
var string_val_1 string
|
||||||
|
stringtype := reflect.TypeOf(string_val_1)
|
||||||
|
stringvalue := reflect.New(stringtype)
|
||||||
|
castptr := stringvalue.Interface()
|
||||||
|
|
||||||
|
err := json.Unmarshal(insrc[token_start:token_end], castptr)
|
||||||
|
fmt.Println(err)
|
||||||
|
|
||||||
|
if tok == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fmt.Println(tok, dec.InputOffset())
|
||||||
|
}
|
||||||
// src := []any{"a", 1, false}
|
// src := []any{"a", 1, false}
|
||||||
// payload, _ := json.Marshal(src)
|
// payload, _ := json.Marshal(src)
|
||||||
|
|
||||||
@ -28,3 +59,53 @@ func TestExpTable(t *testing.T) {
|
|||||||
// con.AddHandler(receiver)
|
// con.AddHandler(receiver)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peerHandler struct {
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ph *peerHandler) ApiFunc1(arg1 string, arg2 int) error {
|
||||||
|
fmt.Println("ApiFunc1", ph.id, arg1, arg2)
|
||||||
|
return errors.New("fake")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ph *peerHandler) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) {
|
||||||
|
fmt.Println("ApiFunc2", ph.id, arg1, arg2)
|
||||||
|
return "success", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ph *peerHandler) ApiFunc3(arg1 float64, arg2 []int) {
|
||||||
|
fmt.Println("ApiFunc3", ph.id, arg1, arg2)
|
||||||
|
}
|
||||||
|
|
||||||
|
type dummySessionConsumer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dsc *dummySessionConsumer) Query(string) (session.Authorization, error) {
|
||||||
|
return session.Authorization{}, nil
|
||||||
|
}
|
||||||
|
func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) {
|
||||||
|
return session.Authorization{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPeerApiBroker(t *testing.T) {
|
||||||
|
handler := MakeWebsocketPeerApiHandler[peerHandler]("test")
|
||||||
|
ws, err := NewWebsocketPeerHandler(&dummySessionConsumer{}, "redis://192.168.8.94:6380/4")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.AddHandler(handler)
|
||||||
|
|
||||||
|
peer := &peerHandler{
|
||||||
|
id: "onlyone",
|
||||||
|
}
|
||||||
|
func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
|
||||||
|
ws.Call(peer, "test.ApiFunc1", func1args)
|
||||||
|
|
||||||
|
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
|
||||||
|
ws.Call(peer, "test.ApiFunc2", func1args)
|
||||||
|
|
||||||
|
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
|
||||||
|
ws.Call(peer, "test.ApiFunc3", func1args)
|
||||||
|
}
|
||||||
|
|||||||
1
wshandler/config.json
Normal file
1
wshandler/config.json
Normal file
@ -0,0 +1 @@
|
|||||||
|
{}
|
||||||
@ -86,9 +86,7 @@ type send_msg_queue_elem struct {
|
|||||||
msg []byte
|
msg []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type WebsocketHandler struct {
|
type websocketHandlerBase struct {
|
||||||
WebsocketApiBroker
|
|
||||||
|
|
||||||
redisMsgChanName string
|
redisMsgChanName string
|
||||||
redisCmdChanName string
|
redisCmdChanName string
|
||||||
redisSync *redis.Client
|
redisSync *redis.Client
|
||||||
@ -101,6 +99,16 @@ type WebsocketHandler struct {
|
|||||||
sessionConsumer session.Consumer
|
sessionConsumer session.Consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WebsocketHandler struct {
|
||||||
|
WebsocketApiBroker
|
||||||
|
*websocketHandlerBase
|
||||||
|
}
|
||||||
|
|
||||||
|
type WebsocketPeerHandler struct {
|
||||||
|
WebsocketPeerApiBroker
|
||||||
|
*websocketHandlerBase
|
||||||
|
}
|
||||||
|
|
||||||
type wsConfig struct {
|
type wsConfig struct {
|
||||||
gocommon.StorageAddr `json:"storage"`
|
gocommon.StorageAddr `json:"storage"`
|
||||||
}
|
}
|
||||||
@ -116,7 +124,7 @@ func init() {
|
|||||||
gob.Register([]any{})
|
gob.Register([]any{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
|
func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*websocketHandlerBase, error) {
|
||||||
var config wsConfig
|
var config wsConfig
|
||||||
if err := gocommon.LoadConfig(&config); err != nil {
|
if err := gocommon.LoadConfig(&config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -151,7 +159,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return &WebsocketHandler{
|
return &websocketHandlerBase{
|
||||||
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
||||||
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
||||||
redisSync: redisSync,
|
redisSync: redisSync,
|
||||||
@ -163,6 +171,28 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewWebsocketPeerHandler(consumer session.Consumer, redisUrl string) (*WebsocketPeerHandler, error) {
|
||||||
|
base, err := makeWebsocketHandlerBase(consumer, redisUrl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &WebsocketPeerHandler{
|
||||||
|
websocketHandlerBase: base,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
|
||||||
|
base, err := makeWebsocketHandlerBase(consumer, redisUrl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &WebsocketHandler{
|
||||||
|
websocketHandlerBase: base,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
||||||
ws.connWaitGroup.Add(1)
|
ws.connWaitGroup.Add(1)
|
||||||
go ws.mainLoop(ctx)
|
go ws.mainLoop(ctx)
|
||||||
|
|||||||
Reference in New Issue
Block a user