From 5c7e4e4df23a82120c59e75379fd93f1d5e26dfb Mon Sep 17 00:00:00 2001 From: mklee Date: Wed, 25 Jun 2025 16:47:08 +0900 Subject: [PATCH] =?UTF-8?q?opensearch=20gocommon=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EC=9D=B4=EB=8F=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 5 +- go.sum | 2 + opensearch/client.go | 184 ++++++++++++++++++++++++++++++++++++++ opensearch/client_test.go | 33 +++++++ 4 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 opensearch/client.go create mode 100644 opensearch/client_test.go diff --git a/go.mod b/go.mod index f6de603..1f2a879 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module repositories.action2quare.com/ayo/gocommon -go 1.20 +go 1.22 + +toolchain go1.22.4 require ( github.com/awa/go-iap v1.32.0 @@ -34,6 +36,7 @@ require ( github.com/klauspost/compress v1.13.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/opensearch-project/opensearch-go/v4 v4.5.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect github.com/prometheus/common v0.44.0 // indirect diff --git a/go.sum b/go.sum index 2cf65cf..51edead 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/opensearch-project/opensearch-go/v4 v4.5.0 h1:26XckmmF6MhlXt91Bu1yY6R51jy1Ns/C3XgIfvyeTRo= +github.com/opensearch-project/opensearch-go/v4 v4.5.0/go.mod h1:VmFc7dqOEM3ZtLhrpleOzeq+cqUgNabqQG5gX0xId64= github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/opensearch/client.go b/opensearch/client.go new file mode 100644 index 0000000..20d7b3a --- /dev/null +++ b/opensearch/client.go @@ -0,0 +1,184 @@ +package opensearch + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + osg "github.com/opensearch-project/opensearch-go/v4" + osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type Config struct { + osg.Config `json:",inline"` + IndexPrefix string `json:"IndexPrefix"` + SigningKey string `json:"SigningKey"` +} + +type Client struct { + *osg.Client + cfg Config + signingKey []byte + indexTemplatePattern string + bulkHeader http.Header + singleHeader http.Header +} + +type LogDocument struct { + Type string `json:"type"` + Body any `json:"body"` + Timestamp string `json:"@timestamp"` + + Country string `json:"country"` + Ip string `json:"ip"` + Uid string `json:"uid"` + Auth struct { + Type string `json:"type"` + Id string `json:"id"` + } `json:"auth"` +} + +func NewLogDocument(logType string, body any) *LogDocument { + return &LogDocument{ + Type: strings.ToLower(logType), + Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"), + Body: body, + } +} + +func (c *Client) Send(ld *LogDocument) error { + if c.Client == nil { + return nil + } + + serialized, _ := json.Marshal(ld) + reader := bytes.NewBuffer(serialized) + req := osapi.IndexReq{ + Index: c.indexTemplatePattern + ld.Type, + Body: reader, + Header: c.singleHeader, + } + logger.Println("LogSend", req) + + resp, err := c.Do(context.Background(), req, nil) + logger.Println(resp) + if err != nil { + return err + } + defer resp.Body.Close() + + r, err2 := io.ReadAll(resp.Body) + if err2 != nil { + logger.Println("LogSend resp read error :", err2) + } else { + logger.Println("LogSend resp :", string(r)) + } + + return nil +} + +func (c *Client) SendBulk(ds map[string]*LogDocument) error { + var contents string + + for _, d := range ds { + b, _ := json.Marshal(d) + contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b)) + } + + reader := bytes.NewBuffer([]byte(contents)) + req := osapi.BulkReq{ + Body: reader, + Header: c.bulkHeader, + } + resp, err := c.Do(context.Background(), req, nil) + if err != nil { + return err + } + logger.Println(resp) + defer resp.Body.Close() + + return nil +} + +var jwtHeader string +var encoding = base64.RawURLEncoding + +func init() { + src := []byte(`{"alg": "HS256","typ": "JWT"}`) + dst := make([]byte, len(src)*2) + encoding.Encode(dst, src) + enclen := encoding.EncodedLen(len(src)) + jwtHeader = string(dst[:enclen]) +} + +func (c *Client) MakeJWT(subject string, ttl time.Duration) string { + if len(c.signingKey) == 0 { + return "" + } + + now := time.Now().UTC().Add(ttl).Add(time.Hour).Unix() + src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"ds_client_full_access"}`, now, subject)) + payload := make([]byte, encoding.EncodedLen(len(src))) + encoding.Encode(payload, src) + + encoded := jwtHeader + "." + string(payload) + mac := hmac.New(sha256.New, c.signingKey) + mac.Write([]byte(encoded)) + signature := mac.Sum(nil) + sigenc := make([]byte, encoding.EncodedLen(len(signature))) + encoding.Encode(sigenc, signature) + + return encoded + "." + string(sigenc) +} + +func NewClient(cfg Config) (Client, error) { + if len(cfg.Addresses) == 0 { + return Client{}, nil + } + + client, err := osg.NewClient(cfg.Config) + if err != nil { + return Client{}, err + } + + var signingKey []byte + if len(cfg.SigningKey) > 0 { + dst := make([]byte, len(cfg.SigningKey)*2) + dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey)) + signingKey = dst[:dstlen] + } + + indexPrefix := cfg.IndexPrefix + if !strings.HasSuffix(indexPrefix, "-") { + indexPrefix += "-" + } + if !strings.HasSuffix(indexPrefix, "ds-logs-") { + indexPrefix = "ds-logs-" + indexPrefix + } + + authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password)))) + + bulkHeader := make(http.Header) + bulkHeader.Set("Authorization", authHeader) + + singleHeader := make(http.Header) + singleHeader.Set("Authorization", authHeader) + + return Client{ + Client: client, + cfg: cfg, + signingKey: signingKey, + indexTemplatePattern: indexPrefix, + bulkHeader: bulkHeader, + singleHeader: singleHeader, + }, nil +} diff --git a/opensearch/client_test.go b/opensearch/client_test.go new file mode 100644 index 0000000..8ef76b5 --- /dev/null +++ b/opensearch/client_test.go @@ -0,0 +1,33 @@ +package opensearch + +import ( + "testing" +) + +func TestNewClient(t *testing.T) { + // var cfg Config + + // cfg.Addresses = []string{"http://localhost:9200"} + // client, err := NewClient(cfg) + // if err != nil { + // t.Errorf("NewClient() error = %v", err) + // return + // } + + // for i := 0; i < 10; i++ { + // MakeActorLog("LOGIN", "test_user", "stalkername").Write(&client, bson.M{ + // "country": "kr", + // "ip": "127.0.0.1", + // }) + // time.Sleep(time.Second) + // } + + // for i := 0; i < 10; i++ { + // MakeActorLog("Match", "test_user", "stalkername").Write(&client, bson.M{ + // "server": "kr001", + // "mode": "pvp", + // "address": "server address", + // }) + // time.Sleep(time.Second) + // } +}