websocket.Conn 을 래핑하는 구조체 추가 - 동시 write 오류 제거용
This commit is contained in:
@ -10,9 +10,11 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
@ -27,12 +29,94 @@ import (
|
||||
|
||||
var noAuthFlag = flagx.Bool("noauth", false, "")
|
||||
|
||||
type Conn struct {
|
||||
innerConn *websocket.Conn
|
||||
spinLock int32
|
||||
}
|
||||
|
||||
func makeConn(conn *websocket.Conn) *Conn {
|
||||
return &Conn{
|
||||
innerConn: conn,
|
||||
spinLock: 0,
|
||||
}
|
||||
}
|
||||
|
||||
type wsconn struct {
|
||||
*websocket.Conn
|
||||
*Conn
|
||||
sender *Sender
|
||||
closeMessage string
|
||||
}
|
||||
|
||||
type noCopy struct{}
|
||||
type websocketWriter struct {
|
||||
_ noCopy
|
||||
innerConn *websocket.Conn
|
||||
spinLock *int32
|
||||
fingerprint int32
|
||||
}
|
||||
|
||||
func (c websocketWriter) writeImpl(vf func() error) error {
|
||||
defer atomic.StoreInt32(c.spinLock, 0)
|
||||
|
||||
for i := int64(0); ; i++ {
|
||||
if atomic.CompareAndSwapInt32(c.spinLock, 0, c.fingerprint) {
|
||||
return vf()
|
||||
}
|
||||
|
||||
time.Sleep(time.Microsecond)
|
||||
if i >= int64(time.Second/time.Microsecond) && i&int64(time.Second/time.Microsecond) == 0 {
|
||||
// 1초동안 락 실패
|
||||
logger.Println("websocket write lock failed : ", i/int64(time.Second/time.Microsecond))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c websocketWriter) WriteJSON(v interface{}) error {
|
||||
return c.writeImpl(func() error { return c.innerConn.WriteJSON(v) })
|
||||
}
|
||||
|
||||
func (c websocketWriter) WriteMessage(messageType int, data []byte) error {
|
||||
return c.writeImpl(func() error { return c.innerConn.WriteMessage(messageType, data) })
|
||||
}
|
||||
|
||||
func (c websocketWriter) WritePreparedMessage(pm *websocket.PreparedMessage) error {
|
||||
return c.writeImpl(func() error { return c.innerConn.WritePreparedMessage(pm) })
|
||||
}
|
||||
|
||||
func (c websocketWriter) WriteControl(messageType int, data []byte, deadline time.Time) error {
|
||||
return c.writeImpl(func() error { return c.innerConn.WriteControl(messageType, data, deadline) })
|
||||
}
|
||||
|
||||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||
return c.innerConn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
return c.innerConn.Close()
|
||||
}
|
||||
|
||||
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
|
||||
return c.innerConn.ReadMessage()
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.innerConn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
|
||||
return c.innerConn.NextReader()
|
||||
}
|
||||
|
||||
var websocketWriterSeq = int32(1)
|
||||
|
||||
func (c *Conn) MakeWriter() websocketWriter {
|
||||
return websocketWriter{
|
||||
innerConn: c.innerConn,
|
||||
spinLock: &c.spinLock,
|
||||
fingerprint: atomic.AddInt32(&websocketWriterSeq, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type UpstreamMessage struct {
|
||||
Alias string
|
||||
Accid primitive.ObjectID
|
||||
@ -82,7 +166,7 @@ type EventReceiver interface {
|
||||
}
|
||||
|
||||
type send_msg_queue_elem struct {
|
||||
to *websocket.Conn
|
||||
to *Conn
|
||||
pmsg *websocket.PreparedMessage
|
||||
//msg []byte
|
||||
}
|
||||
@ -148,7 +232,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
||||
return
|
||||
}
|
||||
|
||||
elem.to.WritePreparedMessage(elem.pmsg)
|
||||
elem.to.MakeWriter().WritePreparedMessage(elem.pmsg)
|
||||
}
|
||||
|
||||
for elem := range sendchan {
|
||||
@ -197,7 +281,7 @@ func (ws *WebsocketHandler) SendUpstreamMessage(msg *UpstreamMessage) {
|
||||
ws.localDeliveryChan <- msg
|
||||
}
|
||||
|
||||
func (ws *WebsocketHandler) WriteDirectMessage(c *websocket.Conn, messageType int, data []byte) {
|
||||
func (ws *WebsocketHandler) WriteDirectMessage(c *Conn, messageType int, data []byte) {
|
||||
pmsg, _ := websocket.NewPreparedMessage(messageType, data)
|
||||
ws.sendMsgChan <- send_msg_queue_elem{
|
||||
to: c,
|
||||
@ -479,13 +563,13 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
||||
|
||||
case accid := <-ws.forceCloseChan:
|
||||
if conn := entireConns[accid.Hex()]; conn != nil {
|
||||
conn.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||
conn.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) {
|
||||
func upgrade_core(ws *WebsocketHandler, conn *Conn, accid primitive.ObjectID, alias string) {
|
||||
newconn := &wsconn{
|
||||
Conn: conn,
|
||||
sender: &Sender{
|
||||
@ -503,7 +587,7 @@ func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.Ob
|
||||
}()
|
||||
|
||||
for {
|
||||
messageType, r, err := c.NextReader()
|
||||
messageType, r, err := c.innerConn.NextReader()
|
||||
if err != nil {
|
||||
if ce, ok := err.(*websocket.CloseError); ok {
|
||||
c.closeMessage = ce.Text
|
||||
@ -596,7 +680,7 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
|
||||
alias = accid.Hex()
|
||||
}
|
||||
|
||||
upgrade_core(ws, conn, accid, alias)
|
||||
upgrade_core(ws, makeConn(conn), accid, alias)
|
||||
}
|
||||
|
||||
func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
||||
@ -643,5 +727,5 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
||||
alias = authinfo.Account.Hex()
|
||||
}
|
||||
|
||||
upgrade_core(ws, conn, authinfo.Account, alias)
|
||||
upgrade_core(ws, makeConn(conn), authinfo.Account, alias)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user