Compare commits
2 Commits
new_conn
...
2dafadf949
| Author | SHA1 | Date | |
|---|---|---|---|
| 2dafadf949 | |||
| f0a97c4701 |
@ -8,6 +8,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||||
@ -15,6 +16,7 @@ import (
|
|||||||
|
|
||||||
var stdlogger *log.Logger
|
var stdlogger *log.Logger
|
||||||
var UseLogFile = flagx.Bool("logfile", false, "")
|
var UseLogFile = flagx.Bool("logfile", false, "")
|
||||||
|
var _ = flagx.Int("logprefix", 3, "0 : no_prefix, 1 : date, 2 : time, 3 : datetime")
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
binpath, _ := os.Executable()
|
binpath, _ := os.Executable()
|
||||||
@ -23,7 +25,29 @@ func init() {
|
|||||||
var outWriter io.Writer
|
var outWriter io.Writer
|
||||||
outWriter = os.Stdout
|
outWriter = os.Stdout
|
||||||
|
|
||||||
if *UseLogFile {
|
args := os.Args
|
||||||
|
useLogFile := false
|
||||||
|
for _, arg := range args {
|
||||||
|
if strings.HasPrefix(arg, "-logfile=") {
|
||||||
|
useLogFile, _ = strconv.ParseBool(arg[9:])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if arg == "-logfile" {
|
||||||
|
useLogFile = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logprefix := 3
|
||||||
|
for _, arg := range args {
|
||||||
|
if strings.HasPrefix(arg, "-logprefix=") {
|
||||||
|
logprefix, _ = strconv.Atoi(arg[11:])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if useLogFile {
|
||||||
ext := path.Ext(binname)
|
ext := path.Ext(binname)
|
||||||
if len(ext) > 0 {
|
if len(ext) > 0 {
|
||||||
binname = binname[:len(binname)-len(ext)]
|
binname = binname[:len(binname)-len(ext)]
|
||||||
@ -38,8 +62,12 @@ func init() {
|
|||||||
outWriter = io.MultiWriter(outWriter, logFile)
|
outWriter = io.MultiWriter(outWriter, logFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if logprefix < 4 {
|
||||||
|
stdlogger = log.New(outWriter, "", logprefix)
|
||||||
|
} else {
|
||||||
stdlogger = log.New(outWriter, "", log.LstdFlags)
|
stdlogger = log.New(outWriter, "", log.LstdFlags)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Println(v ...interface{}) {
|
func Println(v ...interface{}) {
|
||||||
stdlogger.Output(2, fmt.Sprintln(v...))
|
stdlogger.Output(2, fmt.Sprintln(v...))
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -16,7 +17,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type apiFuncType func(ApiCallContext)
|
type apiFuncType func(ApiCallContext)
|
||||||
type connFuncType func(*Conn, *Sender)
|
type connFuncType func(*websocket.Conn, *Sender)
|
||||||
type disconnFuncType func(string, *Sender)
|
type disconnFuncType func(string, *Sender)
|
||||||
|
|
||||||
type WebsocketApiHandler struct {
|
type WebsocketApiHandler struct {
|
||||||
@ -52,7 +53,7 @@ func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketA
|
|||||||
if method.Type.NumIn() != 3 {
|
if method.Type.NumIn() != 3 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if method.Type.In(1) != reflect.TypeOf((*Conn)(nil)) {
|
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
|
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
|
||||||
@ -61,9 +62,9 @@ func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketA
|
|||||||
funcptr := method.Func.Pointer()
|
funcptr := method.Func.Pointer()
|
||||||
p1 := unsafe.Pointer(&funcptr)
|
p1 := unsafe.Pointer(&funcptr)
|
||||||
p2 := unsafe.Pointer(&p1)
|
p2 := unsafe.Pointer(&p1)
|
||||||
connfuncptr := (*func(*T, *Conn, *Sender))(p2)
|
connfuncptr := (*func(*T, *websocket.Conn, *Sender))(p2)
|
||||||
|
|
||||||
connfunc = func(c *Conn, s *Sender) {
|
connfunc = func(c *websocket.Conn, s *Sender) {
|
||||||
(*connfuncptr)(receiver, c, s)
|
(*connfuncptr)(receiver, c, s)
|
||||||
}
|
}
|
||||||
} else if method.Name == ClientDisconnected {
|
} else if method.Name == ClientDisconnected {
|
||||||
|
|||||||
@ -10,11 +10,9 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
@ -29,94 +27,12 @@ import (
|
|||||||
|
|
||||||
var noAuthFlag = flagx.Bool("noauth", false, "")
|
var noAuthFlag = flagx.Bool("noauth", false, "")
|
||||||
|
|
||||||
type Conn struct {
|
|
||||||
innerConn *websocket.Conn
|
|
||||||
spinLock int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeConn(conn *websocket.Conn) *Conn {
|
|
||||||
return &Conn{
|
|
||||||
innerConn: conn,
|
|
||||||
spinLock: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type wsconn struct {
|
type wsconn struct {
|
||||||
*Conn
|
*websocket.Conn
|
||||||
sender *Sender
|
sender *Sender
|
||||||
closeMessage string
|
closeMessage string
|
||||||
}
|
}
|
||||||
|
|
||||||
type noCopy struct{}
|
|
||||||
type websocketWriter struct {
|
|
||||||
_ noCopy
|
|
||||||
innerConn *websocket.Conn
|
|
||||||
spinLock *int32
|
|
||||||
fingerprint int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c websocketWriter) writeImpl(vf func() error) error {
|
|
||||||
defer atomic.StoreInt32(c.spinLock, 0)
|
|
||||||
|
|
||||||
for i := int64(0); ; i++ {
|
|
||||||
if atomic.CompareAndSwapInt32(c.spinLock, 0, c.fingerprint) {
|
|
||||||
return vf()
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Microsecond)
|
|
||||||
if i >= int64(time.Second/time.Microsecond) && i&int64(time.Second/time.Microsecond) == 0 {
|
|
||||||
// 1초동안 락 실패
|
|
||||||
logger.Println("websocket write lock failed : ", i/int64(time.Second/time.Microsecond))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c websocketWriter) WriteJSON(v interface{}) error {
|
|
||||||
return c.writeImpl(func() error { return c.innerConn.WriteJSON(v) })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c websocketWriter) WriteMessage(messageType int, data []byte) error {
|
|
||||||
return c.writeImpl(func() error { return c.innerConn.WriteMessage(messageType, data) })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c websocketWriter) WritePreparedMessage(pm *websocket.PreparedMessage) error {
|
|
||||||
return c.writeImpl(func() error { return c.innerConn.WritePreparedMessage(pm) })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c websocketWriter) WriteControl(messageType int, data []byte, deadline time.Time) error {
|
|
||||||
return c.writeImpl(func() error { return c.innerConn.WriteControl(messageType, data, deadline) })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
|
||||||
return c.innerConn.SetReadDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
|
||||||
return c.innerConn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
|
|
||||||
return c.innerConn.ReadMessage()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) RemoteAddr() net.Addr {
|
|
||||||
return c.innerConn.RemoteAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
|
|
||||||
return c.innerConn.NextReader()
|
|
||||||
}
|
|
||||||
|
|
||||||
var websocketWriterSeq = int32(1)
|
|
||||||
|
|
||||||
func (c *Conn) MakeWriter() websocketWriter {
|
|
||||||
return websocketWriter{
|
|
||||||
innerConn: c.innerConn,
|
|
||||||
spinLock: &c.spinLock,
|
|
||||||
fingerprint: atomic.AddInt32(&websocketWriterSeq, 1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type UpstreamMessage struct {
|
type UpstreamMessage struct {
|
||||||
Alias string
|
Alias string
|
||||||
Accid primitive.ObjectID
|
Accid primitive.ObjectID
|
||||||
@ -166,7 +82,7 @@ type EventReceiver interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type send_msg_queue_elem struct {
|
type send_msg_queue_elem struct {
|
||||||
to *Conn
|
to *websocket.Conn
|
||||||
pmsg *websocket.PreparedMessage
|
pmsg *websocket.PreparedMessage
|
||||||
//msg []byte
|
//msg []byte
|
||||||
}
|
}
|
||||||
@ -232,7 +148,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
elem.to.MakeWriter().WritePreparedMessage(elem.pmsg)
|
elem.to.WritePreparedMessage(elem.pmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
for elem := range sendchan {
|
for elem := range sendchan {
|
||||||
@ -281,7 +197,7 @@ func (ws *WebsocketHandler) SendUpstreamMessage(msg *UpstreamMessage) {
|
|||||||
ws.localDeliveryChan <- msg
|
ws.localDeliveryChan <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) WriteDirectMessage(c *Conn, messageType int, data []byte) {
|
func (ws *WebsocketHandler) WriteDirectMessage(c *websocket.Conn, messageType int, data []byte) {
|
||||||
pmsg, _ := websocket.NewPreparedMessage(messageType, data)
|
pmsg, _ := websocket.NewPreparedMessage(messageType, data)
|
||||||
ws.sendMsgChan <- send_msg_queue_elem{
|
ws.sendMsgChan <- send_msg_queue_elem{
|
||||||
to: c,
|
to: c,
|
||||||
@ -563,13 +479,13 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
|||||||
|
|
||||||
case accid := <-ws.forceCloseChan:
|
case accid := <-ws.forceCloseChan:
|
||||||
if conn := entireConns[accid.Hex()]; conn != nil {
|
if conn := entireConns[accid.Hex()]; conn != nil {
|
||||||
conn.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
conn.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func upgrade_core(ws *WebsocketHandler, conn *Conn, accid primitive.ObjectID, alias string) {
|
func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) {
|
||||||
newconn := &wsconn{
|
newconn := &wsconn{
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
sender: &Sender{
|
sender: &Sender{
|
||||||
@ -587,7 +503,7 @@ func upgrade_core(ws *WebsocketHandler, conn *Conn, accid primitive.ObjectID, al
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
messageType, r, err := c.innerConn.NextReader()
|
messageType, r, err := c.NextReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ce, ok := err.(*websocket.CloseError); ok {
|
if ce, ok := err.(*websocket.CloseError); ok {
|
||||||
c.closeMessage = ce.Text
|
c.closeMessage = ce.Text
|
||||||
@ -680,7 +596,7 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
|
|||||||
alias = accid.Hex()
|
alias = accid.Hex()
|
||||||
}
|
}
|
||||||
|
|
||||||
upgrade_core(ws, makeConn(conn), accid, alias)
|
upgrade_core(ws, conn, accid, alias)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -727,5 +643,5 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
|||||||
alias = authinfo.Account.Hex()
|
alias = authinfo.Account.Hex()
|
||||||
}
|
}
|
||||||
|
|
||||||
upgrade_core(ws, makeConn(conn), authinfo.Account, alias)
|
upgrade_core(ws, conn, authinfo.Account, alias)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,7 +23,7 @@ type WebsocketPeerHandler interface {
|
|||||||
|
|
||||||
type peerCtorChannelValue struct {
|
type peerCtorChannelValue struct {
|
||||||
accid primitive.ObjectID
|
accid primitive.ObjectID
|
||||||
conn *Conn
|
conn *websocket.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerDtorChannelValue struct {
|
type peerDtorChannelValue struct {
|
||||||
@ -42,7 +42,7 @@ type websocketPeerHandler[T PeerInterface] struct {
|
|||||||
|
|
||||||
type PeerInterface interface {
|
type PeerInterface interface {
|
||||||
ClientDisconnected(string)
|
ClientDisconnected(string)
|
||||||
ClientConnected(*Conn)
|
ClientConnected(*websocket.Conn)
|
||||||
}
|
}
|
||||||
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
|
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
|
||||||
|
|
||||||
@ -182,7 +182,7 @@ func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ws *websocketPeerHandler[T]) sessionMonitoring() {
|
func (ws *websocketPeerHandler[T]) sessionMonitoring() {
|
||||||
all := make(map[primitive.ObjectID]*Conn)
|
all := make(map[primitive.ObjectID]*websocket.Conn)
|
||||||
unauthdata := []byte{0x03, 0xec}
|
unauthdata := []byte{0x03, 0xec}
|
||||||
unauthdata = append(unauthdata, []byte("unauthorized")...)
|
unauthdata = append(unauthdata, []byte("unauthorized")...)
|
||||||
for {
|
for {
|
||||||
@ -191,7 +191,7 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() {
|
|||||||
all[estVal.accid] = estVal.conn
|
all[estVal.accid] = estVal.conn
|
||||||
case disVal := <-ws.peerDtorChannel:
|
case disVal := <-ws.peerDtorChannel:
|
||||||
if c := all[disVal.accid]; c != nil {
|
if c := all[disVal.accid]; c != nil {
|
||||||
c.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||||
delete(all, disVal.accid)
|
delete(all, disVal.accid)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,8 +203,8 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.ObjectID, sk string) {
|
func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, sk string) {
|
||||||
go func(c *Conn, accid primitive.ObjectID, sk string) {
|
go func(c *websocket.Conn, accid primitive.ObjectID, sk string) {
|
||||||
peer := ws.createPeer(accid)
|
peer := ws.createPeer(accid)
|
||||||
var closeReason string
|
var closeReason string
|
||||||
|
|
||||||
@ -217,7 +217,6 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.Obje
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
response := make([]byte, 255)
|
response := make([]byte, 255)
|
||||||
writer := c.MakeWriter()
|
|
||||||
for {
|
for {
|
||||||
response = response[:5]
|
response = response[:5]
|
||||||
messageType, r, err := c.NextReader()
|
messageType, r, err := c.NextReader()
|
||||||
@ -278,7 +277,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.Obje
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Println("websocket.NewPreparedMessage failed :", err)
|
logger.Println("websocket.NewPreparedMessage failed :", err)
|
||||||
} else {
|
} else {
|
||||||
writer.WritePreparedMessage(pmsg)
|
c.WritePreparedMessage(pmsg)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cmd := make([]byte, flag[0])
|
cmd := make([]byte, flag[0])
|
||||||
@ -347,7 +346,7 @@ func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http
|
|||||||
// alias = accid.Hex()
|
// alias = accid.Hex()
|
||||||
// }
|
// }
|
||||||
|
|
||||||
ws.upgrade_core(&Conn{innerConn: conn}, accid, sk)
|
ws.upgrade_core(conn, accid, sk)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
|
func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -388,5 +387,5 @@ func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Reques
|
|||||||
// } else {
|
// } else {
|
||||||
// alias = authinfo.Account.Hex()
|
// alias = authinfo.Account.Hex()
|
||||||
// }
|
// }
|
||||||
ws.upgrade_core(makeConn(conn), authinfo.Account, sk)
|
ws.upgrade_core(conn, authinfo.Account, sk)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user