Compare commits

23 Commits

Author SHA1 Message Date
b6e187a0a7 Merge pull request 'opensearch gocommon으로 이동' (#1) from ds into master
Reviewed-on: #1
2025-06-25 16:59:07 +09:00
5c7e4e4df2 opensearch gocommon으로 이동 2025-06-25 16:47:08 +09:00
1170be24c0 logger에 RecoverAndErrorSmallStack 추가 2025-05-14 16:14:35 +09:00
e53963d51e 로그 파일 설정 제거 2024-08-22 10:53:32 +09:00
f93d789905 로그에 실행 인자 남김 2024-08-22 09:14:48 +09:00
4dfc070891 헤더에 세션키 없으면 실패 2024-08-13 16:30:42 +09:00
d5fa86b378 nosession에도 option 적용 2024-08-11 20:46:51 +09:00
6d8d2e3078 NewWebsocketHandler에 옵션을 받음 2024-08-11 20:44:54 +09:00
5b3ad3a40c no space 2024-08-08 23:46:18 +09:00
9b1d250cd7 server Start 로그 2024-08-08 23:42:29 +09:00
13124b7903 로그 제거 2024-08-08 14:06:53 +09:00
86fac6bbc0 Merge branch 'master' of https://repositories.action2quare.com/ayo/gocommon 2024-08-08 11:42:52 +09:00
70d3b2507c 로그 추가 2024-08-08 11:42:49 +09:00
ca5632031c [오승석] whitelist key 변경
- Email -> Alias
2024-08-06 20:58:38 +09:00
38c5e05d4c 헤더 사이즈 늘림 2024-08-05 16:27:54 +09:00
7928e69c60 default 404 handler 등록 2024-08-01 13:44:30 +09:00
899bae335e ReadStringsFormValue 추가 2024-07-31 10:51:07 +09:00
8e3d6c28f0 metric 중복 등록 에러는 무시 2024-07-29 17:49:47 +09:00
ae98abe61d RegisterHandlers 시그니쳐 변경 2024-07-29 10:15:04 +09:00
013c89e58d watch err 처리 - modifyChangeStreams has not been run for this collection/database/cluster 2024-07-25 12:47:31 +09:00
dd4928c822 Merge branch 'new_conn' 2024-07-24 18:43:52 +09:00
2dafadf949 logprefix flag 추가 2024-07-24 17:49:45 +09:00
f0a97c4701 logprefix 추가 2024-07-24 17:12:54 +09:00
11 changed files with 395 additions and 36 deletions

5
go.mod
View File

@ -1,6 +1,8 @@
module repositories.action2quare.com/ayo/gocommon
go 1.20
go 1.22
toolchain go1.22.4
require (
github.com/awa/go-iap v1.32.0
@ -34,6 +36,7 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/opensearch-project/opensearch-go/v4 v4.5.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect

2
go.sum
View File

@ -90,6 +90,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/opensearch-project/opensearch-go/v4 v4.5.0 h1:26XckmmF6MhlXt91Bu1yY6R51jy1Ns/C3XgIfvyeTRo=
github.com/opensearch-project/opensearch-go/v4 v4.5.0/go.mod h1:VmFc7dqOEM3ZtLhrpleOzeq+cqUgNabqQG5gX0xId64=
github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs=
github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

View File

@ -2,43 +2,40 @@ package logger
import (
"fmt"
"io"
"log"
"os"
"path"
"runtime"
"runtime/debug"
"strconv"
"strings"
"repositories.action2quare.com/ayo/gocommon/flagx"
)
var stdlogger *log.Logger
var UseLogFile = flagx.Bool("logfile", false, "")
var _ = flagx.Int("logprefix", 3, "0 : no_prefix, 1 : date, 2 : time, 3 : datetime")
func init() {
binpath, _ := os.Executable()
binname := path.Base(strings.ReplaceAll(binpath, "\\", "/"))
outWriter := os.Stdout
var outWriter io.Writer
outWriter = os.Stdout
if *UseLogFile {
ext := path.Ext(binname)
if len(ext) > 0 {
binname = binname[:len(binname)-len(ext)]
args := os.Args
logprefix := 3
for _, arg := range args {
if strings.HasPrefix(arg, "-logprefix=") {
logprefix, _ = strconv.Atoi(arg[11:])
break
}
logFile, err := os.Create(fmt.Sprintf("%s.log", binname))
if err != nil {
os.Stdout.Write([]byte(err.Error()))
panic(err)
}
outWriter = io.MultiWriter(outWriter, logFile)
}
stdlogger = log.New(outWriter, "", log.LstdFlags)
pid := fmt.Sprintf("[%d]", os.Getpid())
outWriter.Write([]byte(strings.Join(append([]string{pid, binpath}, args...), " ")))
if logprefix < 4 {
stdlogger = log.New(outWriter, "", logprefix)
} else {
stdlogger = log.New(outWriter, "", log.LstdFlags)
}
}
func Println(v ...interface{}) {
@ -130,3 +127,33 @@ func ErrorWithCallStack(err error) error {
frames: frames,
}
}
func ErrorSmallStack() {
buf := make([]byte, 1024)
n := runtime.Stack(buf, false)
if n < len(buf) {
buf = buf[:n]
}
Error(string(buf))
}
func RecoverAndErrorSmallStack(r any) any {
if r != nil {
pc := make([]uintptr, 10)
runtime.Callers(1, pc)
curframes := runtime.CallersFrames(pc)
var out []string
for {
frame, more := curframes.Next()
out = append(out, fmt.Sprintf("%s\n\t%s:%d", frame.Function, frame.File, frame.Line))
if !more {
break
}
}
Error(strings.Join(out, "\n"))
}
return r
}

View File

@ -135,7 +135,11 @@ func (pe *prometheusExporter) loop(ctx context.Context) {
}
if err := prometheus.Register(nextcollector); err != nil {
logger.Error("prometheus register err :", *nm, err)
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
// 이미 등록된 metric. child process를 여럿 실행하면 발생됨
} else {
logger.Error("prometheus register err :", *nm, err)
}
} else {
collector = nextcollector
}

View File

@ -154,10 +154,36 @@ func (mc *MongoClient) DropIndex(coll CollectionName, name string) error {
}
func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
// mc.db.RunCommand()
if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
}
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
if err != nil {
if mongoErr, ok := err.(mongo.CommandError); ok {
logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code)
if mongoErr.Code == 40573 {
adminDb := mc.db.Client().Database("admin")
result := adminDb.RunCommand(mc.ctx, bson.D{
{Key: "modifyChangeStreams", Value: 1},
{Key: "database", Value: mc.db.Name()},
{Key: "collection", Value: coll},
{Key: "enable", Value: true},
})
if result.Err() != nil {
logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll)
} else {
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
}
}
}
logger.Fatal(err)
}
return stream, err
}
func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {

184
opensearch/client.go Normal file
View File

@ -0,0 +1,184 @@
package opensearch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
osg "github.com/opensearch-project/opensearch-go/v4"
osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type Config struct {
osg.Config `json:",inline"`
IndexPrefix string `json:"IndexPrefix"`
SigningKey string `json:"SigningKey"`
}
type Client struct {
*osg.Client
cfg Config
signingKey []byte
indexTemplatePattern string
bulkHeader http.Header
singleHeader http.Header
}
type LogDocument struct {
Type string `json:"type"`
Body any `json:"body"`
Timestamp string `json:"@timestamp"`
Country string `json:"country"`
Ip string `json:"ip"`
Uid string `json:"uid"`
Auth struct {
Type string `json:"type"`
Id string `json:"id"`
} `json:"auth"`
}
func NewLogDocument(logType string, body any) *LogDocument {
return &LogDocument{
Type: strings.ToLower(logType),
Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"),
Body: body,
}
}
func (c *Client) Send(ld *LogDocument) error {
if c.Client == nil {
return nil
}
serialized, _ := json.Marshal(ld)
reader := bytes.NewBuffer(serialized)
req := osapi.IndexReq{
Index: c.indexTemplatePattern + ld.Type,
Body: reader,
Header: c.singleHeader,
}
logger.Println("LogSend", req)
resp, err := c.Do(context.Background(), req, nil)
logger.Println(resp)
if err != nil {
return err
}
defer resp.Body.Close()
r, err2 := io.ReadAll(resp.Body)
if err2 != nil {
logger.Println("LogSend resp read error :", err2)
} else {
logger.Println("LogSend resp :", string(r))
}
return nil
}
func (c *Client) SendBulk(ds map[string]*LogDocument) error {
var contents string
for _, d := range ds {
b, _ := json.Marshal(d)
contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b))
}
reader := bytes.NewBuffer([]byte(contents))
req := osapi.BulkReq{
Body: reader,
Header: c.bulkHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
return err
}
logger.Println(resp)
defer resp.Body.Close()
return nil
}
var jwtHeader string
var encoding = base64.RawURLEncoding
func init() {
src := []byte(`{"alg": "HS256","typ": "JWT"}`)
dst := make([]byte, len(src)*2)
encoding.Encode(dst, src)
enclen := encoding.EncodedLen(len(src))
jwtHeader = string(dst[:enclen])
}
func (c *Client) MakeJWT(subject string, ttl time.Duration) string {
if len(c.signingKey) == 0 {
return ""
}
now := time.Now().UTC().Add(ttl).Add(time.Hour).Unix()
src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"ds_client_full_access"}`, now, subject))
payload := make([]byte, encoding.EncodedLen(len(src)))
encoding.Encode(payload, src)
encoded := jwtHeader + "." + string(payload)
mac := hmac.New(sha256.New, c.signingKey)
mac.Write([]byte(encoded))
signature := mac.Sum(nil)
sigenc := make([]byte, encoding.EncodedLen(len(signature)))
encoding.Encode(sigenc, signature)
return encoded + "." + string(sigenc)
}
func NewClient(cfg Config) (Client, error) {
if len(cfg.Addresses) == 0 {
return Client{}, nil
}
client, err := osg.NewClient(cfg.Config)
if err != nil {
return Client{}, err
}
var signingKey []byte
if len(cfg.SigningKey) > 0 {
dst := make([]byte, len(cfg.SigningKey)*2)
dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey))
signingKey = dst[:dstlen]
}
indexPrefix := cfg.IndexPrefix
if !strings.HasSuffix(indexPrefix, "-") {
indexPrefix += "-"
}
if !strings.HasSuffix(indexPrefix, "ds-logs-") {
indexPrefix = "ds-logs-" + indexPrefix
}
authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password))))
bulkHeader := make(http.Header)
bulkHeader.Set("Authorization", authHeader)
singleHeader := make(http.Header)
singleHeader.Set("Authorization", authHeader)
return Client{
Client: client,
cfg: cfg,
signingKey: signingKey,
indexTemplatePattern: indexPrefix,
bulkHeader: bulkHeader,
singleHeader: singleHeader,
}, nil
}

33
opensearch/client_test.go Normal file
View File

@ -0,0 +1,33 @@
package opensearch
import (
"testing"
)
func TestNewClient(t *testing.T) {
// var cfg Config
// cfg.Addresses = []string{"http://localhost:9200"}
// client, err := NewClient(cfg)
// if err != nil {
// t.Errorf("NewClient() error = %v", err)
// return
// }
// for i := 0; i < 10; i++ {
// MakeActorLog("LOGIN", "test_user", "stalkername").Write(&client, bson.M{
// "country": "kr",
// "ip": "127.0.0.1",
// })
// time.Sleep(time.Second)
// }
// for i := 0; i < 10; i++ {
// MakeActorLog("Match", "test_user", "stalkername").Write(&client, bson.M{
// "server": "kr001",
// "mode": "pvp",
// "address": "server address",
// })
// time.Sleep(time.Second)
// }
}

View File

@ -14,6 +14,7 @@ import (
"net/url"
"os"
"os/signal"
"path"
"reflect"
"runtime"
"strconv"
@ -40,6 +41,13 @@ func init() {
gob.Register([]any{})
}
type ServerMuxInterface interface {
http.Handler
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
Handle(pattern string, handler http.Handler)
}
const (
// HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다.
HTTPStatusReloginRequired = 599
@ -139,8 +147,19 @@ func isTlsEnabled(fileout ...*string) bool {
return true
}
func registUnhandledPattern(serveMux ServerMuxInterface) {
defer func() {
recover()
}()
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
logger.Println("page not found :", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
})
}
// NewHTTPServer :
func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
func NewHTTPServerWithPort(serveMux ServerMuxInterface, port int) *Server {
if isTlsEnabled() && port == 80 {
port = 443
}
@ -148,16 +167,21 @@ func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_check"), healthCheckHandler)
registUnhandledPattern(serveMux)
server := &Server{
httpserver: &http.Server{Addr: addr, Handler: serveMux},
httpserver: &http.Server{
Addr: addr,
Handler: serveMux,
MaxHeaderBytes: 2 << 20, // 2 MB
},
}
server.httpserver.SetKeepAlivesEnabled(true)
return server
}
func NewHTTPServer(serveMux *http.ServeMux) *Server {
func NewHTTPServer(serveMux ServerMuxInterface) *Server {
// 시작시 자동으로 enable됨
if isTlsEnabled() && *portptr == 80 {
@ -206,7 +230,12 @@ func (server *Server) Stop() {
}
// Start :
func (server *Server) Start() error {
func (server *Server) Start(name ...string) error {
if len(name) == 0 {
exepath, _ := os.Executable()
name = []string{path.Base(exepath)}
}
if server.httpserver != nil {
ln, r := net.Listen("tcp", server.httpserver.Addr)
if r != nil {
@ -235,6 +264,7 @@ func (server *Server) Start() error {
err = server.httpserver.ServeTLS(proxyListener, crtfile, keyfile)
} else {
logger.Println("tls disabled")
logger.Println(strings.Join(name, ", "), "started")
err = server.httpserver.Serve(proxyListener)
}
@ -481,6 +511,13 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
return strval, len(strval) > 0
}
func ReadStringsFormValue(r url.Values, key string) ([]string, bool) {
if r.Has(key) {
return (map[string][]string)(r)[key], true
}
return nil, false
}
type encoder interface {
Encode(any) error
}

View File

@ -19,7 +19,7 @@ type Authorization struct {
// by authorization provider
Platform string `bson:"p" json:"p"`
Uid string `bson:"u" json:"u"`
Email string `bson:"em" json:"em"`
Alias string `bson:"al" json:"al"`
}
func (auth *Authorization) ToStrings() []string {
@ -27,7 +27,7 @@ func (auth *Authorization) ToStrings() []string {
"a", auth.Account.Hex(),
"p", auth.Platform,
"u", auth.Uid,
"em", auth.Email,
"al", auth.Alias,
"inv", auth.invalidated,
}
}
@ -42,7 +42,7 @@ func MakeAuthrizationFromStringMap(src map[string]string) Authorization {
Account: accid,
Platform: src["p"],
Uid: src["u"],
Email: src["em"],
Alias: src["al"],
invalidated: src["inv"],
}
}

View File

@ -188,6 +188,7 @@ type websocketHandlerBase struct {
type WebsocketHandler struct {
WebsocketApiBroker
websocketHandlerBase
opt *WebsocketHandlerOption
}
type wsConfig struct {
@ -205,7 +206,28 @@ func init() {
gob.Register([]any{})
}
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
type WebsocketHandlerOption struct {
ReadBufferSize, WriteBufferSize int
}
func Option() *WebsocketHandlerOption {
return &WebsocketHandlerOption{
ReadBufferSize: 0,
WriteBufferSize: 0,
}
}
func (opt *WebsocketHandlerOption) SetReadBufferSize(size int) *WebsocketHandlerOption {
opt.ReadBufferSize = size
return opt
}
func (opt *WebsocketHandlerOption) SetWriteBufferSize(size int) *WebsocketHandlerOption {
opt.WriteBufferSize = size
return opt
}
func NewWebsocketHandler(consumer session.Consumer, redisUrl string, opts ...*WebsocketHandlerOption) (*WebsocketHandler, error) {
var config wsConfig
if err := gocommon.LoadConfig(&config); err != nil {
return nil, err
@ -240,6 +262,14 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
}
}()
var opt *WebsocketHandlerOption
if len(opts) > 0 {
// TODO : opts merge
opt = opts[0]
} else {
opt = Option()
}
ws := &WebsocketHandler{
websocketHandlerBase: websocketHandlerBase{
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
@ -252,6 +282,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
sendMsgChan: sendchan,
sessionConsumer: consumer,
},
opt: opt,
}
consumer.RegisterOnSessionInvalidated(ws.onSessionInvalidated)
return ws, nil
@ -266,7 +297,7 @@ func (ws *WebsocketHandler) Cleanup() {
ws.connWaitGroup.Wait()
}
func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
func (ws *WebsocketHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws")
if *noAuthFlag {
serveMux.HandleFunc(url, ws.upgrade_nosession)
@ -665,7 +696,10 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
return
}
var upgrader = websocket.Upgrader{} // use default options
var upgrader = websocket.Upgrader{
ReadBufferSize: ws.opt.ReadBufferSize,
WriteBufferSize: ws.opt.WriteBufferSize,
} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -695,6 +729,11 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
}()
sk := r.Header.Get("AS-X-SESSION")
if len(sk) == 0 {
w.WriteHeader(http.StatusUnauthorized)
return
}
authinfo, err := ws.sessionConsumer.Query(sk)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -712,7 +751,10 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
return
}
var upgrader = websocket.Upgrader{} // use default options
var upgrader = websocket.Upgrader{
ReadBufferSize: ws.opt.ReadBufferSize,
WriteBufferSize: ws.opt.WriteBufferSize,
} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)

View File

@ -11,6 +11,7 @@ import (
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session"
@ -18,7 +19,7 @@ import (
)
type WebsocketPeerHandler interface {
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error
}
type peerCtorChannelValue struct {
@ -164,7 +165,7 @@ func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, creator
return wsh
}
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error {
if *noAuthFlag {
serveMux.HandleFunc(prefix, ws.upgrade_noauth)
} else {