wahandler peer 파일 분리
This commit is contained in:
@ -1,8 +1,8 @@
|
|||||||
package wshandler
|
package wshandler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
@ -12,7 +12,7 @@ import (
|
|||||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type peerApiFuncType func(any, []byte) (any, error)
|
type peerApiFuncType func(any, io.Reader) (any, error)
|
||||||
type peerConnFuncType func(any, *websocket.Conn)
|
type peerConnFuncType func(any, *websocket.Conn)
|
||||||
type peerDisconnFuncType func(any, string)
|
type peerDisconnFuncType func(any, string)
|
||||||
|
|
||||||
@ -100,8 +100,8 @@ func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHan
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
methods[receiverName+"."+method.Name] = func(recv any, buff []byte) (any, error) {
|
methods[receiverName+"."+method.Name] = func(recv any, r io.Reader) (any, error) {
|
||||||
decoder := json.NewDecoder(bytes.NewBuffer(buff))
|
decoder := json.NewDecoder(r)
|
||||||
inargs := make([]any, len(intypes))
|
inargs := make([]any, len(intypes))
|
||||||
|
|
||||||
for i, intype := range intypes {
|
for i, intype := range intypes {
|
||||||
@ -163,21 +163,21 @@ func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *wsconn) {
|
func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *websocket.Conn) {
|
||||||
for _, v := range hc.connFuncs {
|
for _, v := range hc.connFuncs {
|
||||||
v(recv, c.Conn)
|
v(recv, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, c *wsconn) {
|
func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, reason string) {
|
||||||
for _, v := range hc.disconnFuncs {
|
for _, v := range hc.disconnFuncs {
|
||||||
v(recv, c.closeMessage)
|
v(recv, reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, buff []byte) {
|
func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, r io.Reader) {
|
||||||
if found := hc.methods[funcname]; found != nil {
|
if found := hc.methods[funcname]; found != nil {
|
||||||
_, err := found(recv, buff)
|
_, err := found(recv, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Println("api call is failed. err :", err)
|
logger.Println("api call is failed. err :", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -90,22 +90,18 @@ func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) {
|
|||||||
|
|
||||||
func TestPeerApiBroker(t *testing.T) {
|
func TestPeerApiBroker(t *testing.T) {
|
||||||
handler := MakeWebsocketPeerApiHandler[peerHandler]("test")
|
handler := MakeWebsocketPeerApiHandler[peerHandler]("test")
|
||||||
ws, err := NewWebsocketPeerHandler(&dummySessionConsumer{}, "redis://192.168.8.94:6380/4")
|
ws := NewWebsocketPeerHandler(&dummySessionConsumer{})
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ws.AddHandler(handler)
|
ws.AddHandler(handler)
|
||||||
|
|
||||||
peer := &peerHandler{
|
peer := &peerHandler{
|
||||||
id: "onlyone",
|
id: "onlyone",
|
||||||
}
|
}
|
||||||
func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
|
func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
|
||||||
ws.Call(peer, "test.ApiFunc1", func1args)
|
ws.Call(peer, "test.ApiFunc1", bytes.NewBuffer(func1args))
|
||||||
|
|
||||||
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
|
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
|
||||||
ws.Call(peer, "test.ApiFunc2", func1args)
|
ws.Call(peer, "test.ApiFunc2", bytes.NewBuffer(func1args))
|
||||||
|
|
||||||
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
|
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
|
||||||
ws.Call(peer, "test.ApiFunc3", func1args)
|
ws.Call(peer, "test.ApiFunc3", bytes.NewBuffer(func1args))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -86,7 +86,9 @@ type send_msg_queue_elem struct {
|
|||||||
msg []byte
|
msg []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type websocketHandlerBase struct {
|
type WebsocketHandler struct {
|
||||||
|
WebsocketApiBroker
|
||||||
|
|
||||||
redisMsgChanName string
|
redisMsgChanName string
|
||||||
redisCmdChanName string
|
redisCmdChanName string
|
||||||
redisSync *redis.Client
|
redisSync *redis.Client
|
||||||
@ -99,16 +101,6 @@ type websocketHandlerBase struct {
|
|||||||
sessionConsumer session.Consumer
|
sessionConsumer session.Consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
type WebsocketHandler struct {
|
|
||||||
WebsocketApiBroker
|
|
||||||
*websocketHandlerBase
|
|
||||||
}
|
|
||||||
|
|
||||||
type WebsocketPeerHandler struct {
|
|
||||||
WebsocketPeerApiBroker
|
|
||||||
*websocketHandlerBase
|
|
||||||
}
|
|
||||||
|
|
||||||
type wsConfig struct {
|
type wsConfig struct {
|
||||||
gocommon.StorageAddr `json:"storage"`
|
gocommon.StorageAddr `json:"storage"`
|
||||||
}
|
}
|
||||||
@ -124,7 +116,7 @@ func init() {
|
|||||||
gob.Register([]any{})
|
gob.Register([]any{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*websocketHandlerBase, error) {
|
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
|
||||||
var config wsConfig
|
var config wsConfig
|
||||||
if err := gocommon.LoadConfig(&config); err != nil {
|
if err := gocommon.LoadConfig(&config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -159,7 +151,7 @@ func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*webs
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return &websocketHandlerBase{
|
return &WebsocketHandler{
|
||||||
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
||||||
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
||||||
redisSync: redisSync,
|
redisSync: redisSync,
|
||||||
@ -171,28 +163,6 @@ func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*webs
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebsocketPeerHandler(consumer session.Consumer, redisUrl string) (*WebsocketPeerHandler, error) {
|
|
||||||
base, err := makeWebsocketHandlerBase(consumer, redisUrl)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &WebsocketPeerHandler{
|
|
||||||
websocketHandlerBase: base,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
|
|
||||||
base, err := makeWebsocketHandlerBase(consumer, redisUrl)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &WebsocketHandler{
|
|
||||||
websocketHandlerBase: base,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
||||||
ws.connWaitGroup.Add(1)
|
ws.connWaitGroup.Add(1)
|
||||||
go ws.mainLoop(ctx)
|
go ws.mainLoop(ctx)
|
||||||
|
|||||||
159
wshandler/wshandler_peer.go
Normal file
159
wshandler/wshandler_peer.go
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
package wshandler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/session"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WebsocketPeerHandler struct {
|
||||||
|
WebsocketPeerApiBroker
|
||||||
|
sessionConsumer session.Consumer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWebsocketPeerHandler(consumer session.Consumer) WebsocketPeerHandler {
|
||||||
|
return WebsocketPeerHandler{
|
||||||
|
sessionConsumer: consumer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WebsocketPeerHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
|
||||||
|
url := gocommon.MakeHttpHandlerPattern(prefix, "ws")
|
||||||
|
if *noAuthFlag {
|
||||||
|
serveMux.HandleFunc(url, ws.upgrade_nosession)
|
||||||
|
} else {
|
||||||
|
serveMux.HandleFunc(url, ws.upgrade)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WebsocketPeerHandler) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, alias string) {
|
||||||
|
go func(c *websocket.Conn, accid primitive.ObjectID) {
|
||||||
|
peer := ws.CreatePeer(accid)
|
||||||
|
ws.ClientConnected(peer, c)
|
||||||
|
|
||||||
|
var closeReason string
|
||||||
|
for {
|
||||||
|
messageType, r, err := c.NextReader()
|
||||||
|
if err != nil {
|
||||||
|
if ce, ok := err.(*websocket.CloseError); ok {
|
||||||
|
closeReason = ce.Text
|
||||||
|
}
|
||||||
|
c.Close()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if messageType == websocket.CloseMessage {
|
||||||
|
closeMsg, _ := io.ReadAll(r)
|
||||||
|
closeReason = string(closeMsg)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if messageType == websocket.BinaryMessage {
|
||||||
|
var size [1]byte
|
||||||
|
r.Read(size[:])
|
||||||
|
cmd := make([]byte, size[0])
|
||||||
|
r.Read(cmd)
|
||||||
|
ws.Call(peer, string(cmd), r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ws.ClientDisconnected(peer, closeReason)
|
||||||
|
}(conn, accid)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WebsocketPeerHandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// 클라이언트 접속
|
||||||
|
defer func() {
|
||||||
|
s := recover()
|
||||||
|
if s != nil {
|
||||||
|
logger.Error(s)
|
||||||
|
}
|
||||||
|
io.Copy(io.Discard, r.Body)
|
||||||
|
r.Body.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
auth := strings.Split(r.Header.Get("Authorization"), " ")
|
||||||
|
if len(auth) != 2 {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
temp, err := hex.DecodeString(auth[1])
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(temp) != len(primitive.NilObjectID) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
raw := (*[12]byte)(temp)
|
||||||
|
accid := primitive.ObjectID(*raw)
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{} // use default options
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var alias string
|
||||||
|
if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
|
||||||
|
vt, _ := base64.StdEncoding.DecodeString(v)
|
||||||
|
alias = string(vt)
|
||||||
|
} else {
|
||||||
|
alias = accid.Hex()
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.upgrade_core(conn, accid, alias)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *WebsocketPeerHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// 클라이언트 접속
|
||||||
|
defer func() {
|
||||||
|
s := recover()
|
||||||
|
if s != nil {
|
||||||
|
logger.Error(s)
|
||||||
|
}
|
||||||
|
io.Copy(io.Discard, r.Body)
|
||||||
|
r.Body.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
sk := r.Header.Get("AS-X-SESSION")
|
||||||
|
logger.Println("WebsocketHandler.upgrade sk :", sk)
|
||||||
|
authinfo, err := ws.sessionConsumer.Query(sk)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
logger.Error("authorize query failed :", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{} // use default options
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var alias string
|
||||||
|
if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
|
||||||
|
vt, _ := base64.StdEncoding.DecodeString(v)
|
||||||
|
alias = string(vt)
|
||||||
|
} else {
|
||||||
|
alias = authinfo.Account.Hex()
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.upgrade_core(conn, authinfo.Account, alias)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user