Compare commits

2 Commits

Author SHA1 Message Date
2dafadf949 logprefix flag 추가 2024-07-24 17:49:45 +09:00
f0a97c4701 logprefix 추가 2024-07-24 17:12:54 +09:00
4 changed files with 53 additions and 109 deletions

View File

@ -8,6 +8,7 @@ import (
"path" "path"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv"
"strings" "strings"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
@ -15,6 +16,7 @@ import (
var stdlogger *log.Logger var stdlogger *log.Logger
var UseLogFile = flagx.Bool("logfile", false, "") var UseLogFile = flagx.Bool("logfile", false, "")
var _ = flagx.Int("logprefix", 3, "0 : no_prefix, 1 : date, 2 : time, 3 : datetime")
func init() { func init() {
binpath, _ := os.Executable() binpath, _ := os.Executable()
@ -23,7 +25,29 @@ func init() {
var outWriter io.Writer var outWriter io.Writer
outWriter = os.Stdout outWriter = os.Stdout
if *UseLogFile { args := os.Args
useLogFile := false
for _, arg := range args {
if strings.HasPrefix(arg, "-logfile=") {
useLogFile, _ = strconv.ParseBool(arg[9:])
break
}
if arg == "-logfile" {
useLogFile = true
break
}
}
logprefix := 3
for _, arg := range args {
if strings.HasPrefix(arg, "-logprefix=") {
logprefix, _ = strconv.Atoi(arg[11:])
break
}
}
if useLogFile {
ext := path.Ext(binname) ext := path.Ext(binname)
if len(ext) > 0 { if len(ext) > 0 {
binname = binname[:len(binname)-len(ext)] binname = binname[:len(binname)-len(ext)]
@ -38,7 +62,11 @@ func init() {
outWriter = io.MultiWriter(outWriter, logFile) outWriter = io.MultiWriter(outWriter, logFile)
} }
stdlogger = log.New(outWriter, "", log.LstdFlags) if logprefix < 4 {
stdlogger = log.New(outWriter, "", logprefix)
} else {
stdlogger = log.New(outWriter, "", log.LstdFlags)
}
} }
func Println(v ...interface{}) { func Println(v ...interface{}) {

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"unsafe" "unsafe"
"github.com/gorilla/websocket"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
) )
@ -16,7 +17,7 @@ const (
) )
type apiFuncType func(ApiCallContext) type apiFuncType func(ApiCallContext)
type connFuncType func(*Conn, *Sender) type connFuncType func(*websocket.Conn, *Sender)
type disconnFuncType func(string, *Sender) type disconnFuncType func(string, *Sender)
type WebsocketApiHandler struct { type WebsocketApiHandler struct {
@ -52,7 +53,7 @@ func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketA
if method.Type.NumIn() != 3 { if method.Type.NumIn() != 3 {
continue continue
} }
if method.Type.In(1) != reflect.TypeOf((*Conn)(nil)) { if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
continue continue
} }
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) { if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
@ -61,9 +62,9 @@ func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketA
funcptr := method.Func.Pointer() funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr) p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1) p2 := unsafe.Pointer(&p1)
connfuncptr := (*func(*T, *Conn, *Sender))(p2) connfuncptr := (*func(*T, *websocket.Conn, *Sender))(p2)
connfunc = func(c *Conn, s *Sender) { connfunc = func(c *websocket.Conn, s *Sender) {
(*connfuncptr)(receiver, c, s) (*connfuncptr)(receiver, c, s)
} }
} else if method.Name == ClientDisconnected { } else if method.Name == ClientDisconnected {

View File

@ -10,11 +10,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
@ -29,94 +27,12 @@ import (
var noAuthFlag = flagx.Bool("noauth", false, "") var noAuthFlag = flagx.Bool("noauth", false, "")
type Conn struct {
innerConn *websocket.Conn
spinLock int32
}
func makeConn(conn *websocket.Conn) *Conn {
return &Conn{
innerConn: conn,
spinLock: 0,
}
}
type wsconn struct { type wsconn struct {
*Conn *websocket.Conn
sender *Sender sender *Sender
closeMessage string closeMessage string
} }
type noCopy struct{}
type websocketWriter struct {
_ noCopy
innerConn *websocket.Conn
spinLock *int32
fingerprint int32
}
func (c websocketWriter) writeImpl(vf func() error) error {
defer atomic.StoreInt32(c.spinLock, 0)
for i := int64(0); ; i++ {
if atomic.CompareAndSwapInt32(c.spinLock, 0, c.fingerprint) {
return vf()
}
time.Sleep(time.Microsecond)
if i >= int64(time.Second/time.Microsecond) && i&int64(time.Second/time.Microsecond) == 0 {
// 1초동안 락 실패
logger.Println("websocket write lock failed : ", i/int64(time.Second/time.Microsecond))
}
}
}
func (c websocketWriter) WriteJSON(v interface{}) error {
return c.writeImpl(func() error { return c.innerConn.WriteJSON(v) })
}
func (c websocketWriter) WriteMessage(messageType int, data []byte) error {
return c.writeImpl(func() error { return c.innerConn.WriteMessage(messageType, data) })
}
func (c websocketWriter) WritePreparedMessage(pm *websocket.PreparedMessage) error {
return c.writeImpl(func() error { return c.innerConn.WritePreparedMessage(pm) })
}
func (c websocketWriter) WriteControl(messageType int, data []byte, deadline time.Time) error {
return c.writeImpl(func() error { return c.innerConn.WriteControl(messageType, data, deadline) })
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.innerConn.SetReadDeadline(t)
}
func (c *Conn) Close() error {
return c.innerConn.Close()
}
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
return c.innerConn.ReadMessage()
}
func (c *Conn) RemoteAddr() net.Addr {
return c.innerConn.RemoteAddr()
}
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
return c.innerConn.NextReader()
}
var websocketWriterSeq = int32(1)
func (c *Conn) MakeWriter() websocketWriter {
return websocketWriter{
innerConn: c.innerConn,
spinLock: &c.spinLock,
fingerprint: atomic.AddInt32(&websocketWriterSeq, 1),
}
}
type UpstreamMessage struct { type UpstreamMessage struct {
Alias string Alias string
Accid primitive.ObjectID Accid primitive.ObjectID
@ -166,7 +82,7 @@ type EventReceiver interface {
} }
type send_msg_queue_elem struct { type send_msg_queue_elem struct {
to *Conn to *websocket.Conn
pmsg *websocket.PreparedMessage pmsg *websocket.PreparedMessage
//msg []byte //msg []byte
} }
@ -232,7 +148,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
return return
} }
elem.to.MakeWriter().WritePreparedMessage(elem.pmsg) elem.to.WritePreparedMessage(elem.pmsg)
} }
for elem := range sendchan { for elem := range sendchan {
@ -281,7 +197,7 @@ func (ws *WebsocketHandler) SendUpstreamMessage(msg *UpstreamMessage) {
ws.localDeliveryChan <- msg ws.localDeliveryChan <- msg
} }
func (ws *WebsocketHandler) WriteDirectMessage(c *Conn, messageType int, data []byte) { func (ws *WebsocketHandler) WriteDirectMessage(c *websocket.Conn, messageType int, data []byte) {
pmsg, _ := websocket.NewPreparedMessage(messageType, data) pmsg, _ := websocket.NewPreparedMessage(messageType, data)
ws.sendMsgChan <- send_msg_queue_elem{ ws.sendMsgChan <- send_msg_queue_elem{
to: c, to: c,
@ -563,13 +479,13 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
case accid := <-ws.forceCloseChan: case accid := <-ws.forceCloseChan:
if conn := entireConns[accid.Hex()]; conn != nil { if conn := entireConns[accid.Hex()]; conn != nil {
conn.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{}) conn.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
} }
} }
} }
} }
func upgrade_core(ws *WebsocketHandler, conn *Conn, accid primitive.ObjectID, alias string) { func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) {
newconn := &wsconn{ newconn := &wsconn{
Conn: conn, Conn: conn,
sender: &Sender{ sender: &Sender{
@ -587,7 +503,7 @@ func upgrade_core(ws *WebsocketHandler, conn *Conn, accid primitive.ObjectID, al
}() }()
for { for {
messageType, r, err := c.innerConn.NextReader() messageType, r, err := c.NextReader()
if err != nil { if err != nil {
if ce, ok := err.(*websocket.CloseError); ok { if ce, ok := err.(*websocket.CloseError); ok {
c.closeMessage = ce.Text c.closeMessage = ce.Text
@ -680,7 +596,7 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
alias = accid.Hex() alias = accid.Hex()
} }
upgrade_core(ws, makeConn(conn), accid, alias) upgrade_core(ws, conn, accid, alias)
} }
func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
@ -727,5 +643,5 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
alias = authinfo.Account.Hex() alias = authinfo.Account.Hex()
} }
upgrade_core(ws, makeConn(conn), authinfo.Account, alias) upgrade_core(ws, conn, authinfo.Account, alias)
} }

View File

@ -23,7 +23,7 @@ type WebsocketPeerHandler interface {
type peerCtorChannelValue struct { type peerCtorChannelValue struct {
accid primitive.ObjectID accid primitive.ObjectID
conn *Conn conn *websocket.Conn
} }
type peerDtorChannelValue struct { type peerDtorChannelValue struct {
@ -42,7 +42,7 @@ type websocketPeerHandler[T PeerInterface] struct {
type PeerInterface interface { type PeerInterface interface {
ClientDisconnected(string) ClientDisconnected(string)
ClientConnected(*Conn) ClientConnected(*websocket.Conn)
} }
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error) type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
@ -182,7 +182,7 @@ func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID
} }
func (ws *websocketPeerHandler[T]) sessionMonitoring() { func (ws *websocketPeerHandler[T]) sessionMonitoring() {
all := make(map[primitive.ObjectID]*Conn) all := make(map[primitive.ObjectID]*websocket.Conn)
unauthdata := []byte{0x03, 0xec} unauthdata := []byte{0x03, 0xec}
unauthdata = append(unauthdata, []byte("unauthorized")...) unauthdata = append(unauthdata, []byte("unauthorized")...)
for { for {
@ -191,7 +191,7 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() {
all[estVal.accid] = estVal.conn all[estVal.accid] = estVal.conn
case disVal := <-ws.peerDtorChannel: case disVal := <-ws.peerDtorChannel:
if c := all[disVal.accid]; c != nil { if c := all[disVal.accid]; c != nil {
c.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{}) c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
delete(all, disVal.accid) delete(all, disVal.accid)
} }
@ -203,8 +203,8 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() {
} }
} }
func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.ObjectID, sk string) { func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, sk string) {
go func(c *Conn, accid primitive.ObjectID, sk string) { go func(c *websocket.Conn, accid primitive.ObjectID, sk string) {
peer := ws.createPeer(accid) peer := ws.createPeer(accid)
var closeReason string var closeReason string
@ -217,7 +217,6 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.Obje
}() }()
response := make([]byte, 255) response := make([]byte, 255)
writer := c.MakeWriter()
for { for {
response = response[:5] response = response[:5]
messageType, r, err := c.NextReader() messageType, r, err := c.NextReader()
@ -278,7 +277,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *Conn, accid primitive.Obje
if err != nil { if err != nil {
logger.Println("websocket.NewPreparedMessage failed :", err) logger.Println("websocket.NewPreparedMessage failed :", err)
} else { } else {
writer.WritePreparedMessage(pmsg) c.WritePreparedMessage(pmsg)
} }
} else { } else {
cmd := make([]byte, flag[0]) cmd := make([]byte, flag[0])
@ -347,7 +346,7 @@ func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http
// alias = accid.Hex() // alias = accid.Hex()
// } // }
ws.upgrade_core(&Conn{innerConn: conn}, accid, sk) ws.upgrade_core(conn, accid, sk)
} }
func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
@ -388,5 +387,5 @@ func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Reques
// } else { // } else {
// alias = authinfo.Account.Hex() // alias = authinfo.Account.Hex()
// } // }
ws.upgrade_core(makeConn(conn), authinfo.Account, sk) ws.upgrade_core(conn, authinfo.Account, sk)
} }