ws peer 를 제네릭으로 변경

This commit is contained in:
2023-12-20 10:02:49 +09:00
parent b2ee40249a
commit 00f4cab992
3 changed files with 89 additions and 144 deletions

View File

@ -6,71 +6,40 @@ import (
"io" "io"
"reflect" "reflect"
"strings" "strings"
"unsafe"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
) )
type peerApiFuncType func(any, io.Reader) (any, error) type PeerInterface interface {
type peerConnFuncType func(any, *websocket.Conn) ClientDisconnected(string)
type peerDisconnFuncType func(any, string) }
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
type WebsocketPeerApiHandler struct { type WebsocketPeerApiHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType methods map[string]peerApiFuncType[T]
connfunc peerConnFuncType
disconnfunc peerDisconnFuncType
originalReceiverName string originalReceiverName string
} }
func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHandler { func MakeWebsocketPeerApiHandler[T PeerInterface](receiverName string) WebsocketPeerApiHandler[T] {
methods := make(map[string]peerApiFuncType) methods := make(map[string]peerApiFuncType[T])
var archetype *T var archetype T
tp := reflect.TypeOf(archetype) tp := reflect.TypeOf(archetype)
if len(receiverName) == 0 { if len(receiverName) == 0 {
receiverName = tp.Elem().Name() receiverName = tp.Elem().Name()
} }
var connfunc peerConnFuncType
var disconnfunc peerDisconnFuncType
for i := 0; i < tp.NumMethod(); i++ { for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i) method := tp.Method(i)
if method.Type.In(0) != tp { if method.Type.In(0) != tp {
continue continue
} }
if method.Name == ClientConnected { if method.Name == ClientDisconnected {
if method.Type.NumIn() != 2 {
continue continue
} }
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
connfuncptr := (*func(*T, *websocket.Conn))(p2)
connfunc = func(r any, c *websocket.Conn) {
(*connfuncptr)(r.(*T), c)
}
} else if method.Name == ClientDisconnected {
if method.Type.NumIn() != 2 {
continue
}
if method.Type.In(1) != reflect.TypeOf("") {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
disconnfuncptr := (*func(*T, string))(p2)
disconnfunc = func(r any, msg string) {
(*disconnfuncptr)(r.(*T), msg)
}
} else {
var intypes []reflect.Type var intypes []reflect.Type
for i := 1; i < method.Type.NumIn(); i++ { for i := 1; i < method.Type.NumIn(); i++ {
intypes = append(intypes, method.Type.In(i)) intypes = append(intypes, method.Type.In(i))
@ -101,7 +70,7 @@ func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHan
} }
} }
methods[receiverName+"."+method.Name] = func(recv any, r io.Reader) (any, error) { methods[receiverName+"."+method.Name] = func(recv T, r io.Reader) (any, error) {
decoder := json.NewDecoder(r) decoder := json.NewDecoder(r)
inargs := make([]any, len(intypes)) inargs := make([]any, len(intypes))
@ -124,26 +93,21 @@ func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHan
return outconv(method.Func.Call(reflectargs)) return outconv(method.Func.Call(reflectargs))
} }
} }
}
return WebsocketPeerApiHandler{ return WebsocketPeerApiHandler[T]{
methods: methods, methods: methods,
connfunc: connfunc,
disconnfunc: disconnfunc,
originalReceiverName: tp.Elem().Name(), originalReceiverName: tp.Elem().Name(),
} }
} }
type WebsocketPeerApiBroker struct { type WebsocketPeerApiBroker[T PeerInterface] struct {
methods map[string]peerApiFuncType methods map[string]peerApiFuncType[T]
connFuncs []peerConnFuncType CreatePeer func(primitive.ObjectID) T
disconnFuncs []peerDisconnFuncType
CreatePeer func(primitive.ObjectID) any
} }
func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) { func (hc *WebsocketPeerApiBroker[T]) AddHandler(receiver WebsocketPeerApiHandler[T]) {
if hc.methods == nil { if hc.methods == nil {
hc.methods = make(map[string]peerApiFuncType) hc.methods = make(map[string]peerApiFuncType[T])
} }
for k, v := range receiver.methods { for k, v := range receiver.methods {
@ -151,32 +115,9 @@ func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) {
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k) logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods[k] = v hc.methods[k] = v
} }
if receiver.connfunc != nil {
logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName)
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
}
if receiver.disconnfunc != nil {
// disconnfunc은 역순
logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName)
hc.disconnFuncs = append([]peerDisconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
}
} }
func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *websocket.Conn) { func (hc *WebsocketPeerApiBroker[T]) Call(recv T, funcname string, r io.Reader) (any, error) {
for _, v := range hc.connFuncs {
v(recv, c)
}
}
func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, reason string) {
for _, v := range hc.disconnFuncs {
v(recv, reason)
}
}
func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, r io.Reader) (any, error) {
if found := hc.methods[funcname]; found != nil { if found := hc.methods[funcname]; found != nil {
return found(recv, r) return found(recv, r)
} }

View File

@ -60,24 +60,27 @@ func TestUnmarshalToken(t *testing.T) {
} }
type peerHandler struct { type testpeer struct {
id string id string
} }
func (ph *peerHandler) ApiFunc1(arg1 string, arg2 int) error { func (ph *testpeer) ApiFunc1(arg1 string, arg2 int) error {
fmt.Println("ApiFunc1", ph.id, arg1, arg2) fmt.Println("ApiFunc1", ph.id, arg1, arg2)
return errors.New("fake") return errors.New("fake")
} }
func (ph *peerHandler) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) { func (ph *testpeer) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) {
fmt.Println("ApiFunc2", ph.id, arg1, arg2) fmt.Println("ApiFunc2", ph.id, arg1, arg2)
return "success", nil return "success", nil
} }
func (ph *peerHandler) ApiFunc3(arg1 float64, arg2 []int) { func (ph *testpeer) ApiFunc3(arg1 float64, arg2 []int) {
fmt.Println("ApiFunc3", ph.id, arg1, arg2) fmt.Println("ApiFunc3", ph.id, arg1, arg2)
} }
func (ph *testpeer) ClientDisconnected(reason string) {
}
type dummySessionConsumer struct { type dummySessionConsumer struct {
} }
@ -89,19 +92,19 @@ func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) {
} }
func TestPeerApiBroker(t *testing.T) { func TestPeerApiBroker(t *testing.T) {
handler := MakeWebsocketPeerApiHandler[peerHandler]("test") handler := MakeWebsocketPeerApiHandler[*testpeer]("test")
ws := NewWebsocketPeerHandler(&dummySessionConsumer{}) ws := NewWebsocketPeerHandler[*testpeer](&dummySessionConsumer{})
ws.AddHandler(handler) ws.AddHandler(handler)
peer := &peerHandler{ tp := &testpeer{
id: "onlyone", id: "onlyone",
} }
func1args, _ := json.Marshal([]any{string("arg1"), int(100)}) func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
ws.Call(peer, "test.ApiFunc1", bytes.NewBuffer(func1args)) ws.Call(tp, "test.ApiFunc1", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}}) func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
ws.Call(peer, "test.ApiFunc2", bytes.NewBuffer(func1args)) ws.Call(tp, "test.ApiFunc2", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}}) func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
ws.Call(peer, "test.ApiFunc3", bytes.NewBuffer(func1args)) ws.Call(tp, "test.ApiFunc3", bytes.NewBuffer(func1args))
} }

View File

@ -18,18 +18,18 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type WebsocketPeerHandler struct { type WebsocketPeerHandler[T PeerInterface] struct {
WebsocketPeerApiBroker WebsocketPeerApiBroker[T]
sessionConsumer session.Consumer sessionConsumer session.Consumer
} }
func NewWebsocketPeerHandler(consumer session.Consumer) WebsocketPeerHandler { func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer) WebsocketPeerHandler[T] {
return WebsocketPeerHandler{ return WebsocketPeerHandler[T]{
sessionConsumer: consumer, sessionConsumer: consumer,
} }
} }
func (ws *WebsocketPeerHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { func (ws *WebsocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws") url := gocommon.MakeHttpHandlerPattern(prefix, "ws")
if *noAuthFlag { if *noAuthFlag {
serveMux.HandleFunc(url, ws.upgrade_nosession) serveMux.HandleFunc(url, ws.upgrade_nosession)
@ -40,13 +40,15 @@ func (ws *WebsocketPeerHandler) RegisterHandlers(serveMux *http.ServeMux, prefix
return nil return nil
} }
func (ws *WebsocketPeerHandler) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
go func(c *websocket.Conn, accid primitive.ObjectID) { go func(c *websocket.Conn, accid primitive.ObjectID) {
peer := ws.CreatePeer(accid) peer := ws.CreatePeer(accid)
ws.ClientConnected(peer, c)
var closeReason string var closeReason string
defer func() {
peer.ClientDisconnected(closeReason)
}()
response := make([]byte, 255) response := make([]byte, 255)
for { for {
response = response[:5] response = response[:5]
@ -112,11 +114,10 @@ func (ws *WebsocketPeerHandler) upgrade_core(conn *websocket.Conn, accid primiti
} }
} }
} }
ws.ClientDisconnected(peer, closeReason)
}(conn, accid) }(conn, accid)
} }
func (ws *WebsocketPeerHandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) { func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()
@ -166,7 +167,7 @@ func (ws *WebsocketPeerHandler) upgrade_nosession(w http.ResponseWriter, r *http
ws.upgrade_core(conn, accid, nonce) ws.upgrade_core(conn, accid, nonce)
} }
func (ws *WebsocketPeerHandler) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *WebsocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()