From fb3f0385067a65c9cde6e091719166c99e318eab Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 5 Aug 2025 21:40:37 +0900 Subject: [PATCH] =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20=EC=A0=84=EC=86=A1?= =?UTF-8?q?=EC=9D=84=20=EB=B9=84=EB=8F=99=EA=B8=B0=EB=A1=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- opensearch/client.go | 87 ++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/opensearch/client.go b/opensearch/client.go index 3633afa..938dbef 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -8,9 +8,9 @@ import ( "encoding/base64" "encoding/json" "fmt" - "io" "net/http" "strings" + "sync/atomic" "time" osg "github.com/opensearch-project/opensearch-go/v4" @@ -31,6 +31,7 @@ type Client struct { indexTemplatePattern string bulkHeader http.Header singleHeader http.Header + sendingCount int32 } type LogDocument struct { @@ -55,38 +56,43 @@ func NewLogDocument(logType string, body any) *LogDocument { } } -func (c *Client) Send(ld *LogDocument) error { +func (c *Client) Send(ld *LogDocument) { if c.Client == nil { - return nil + return } serialized, _ := json.Marshal(ld) - reader := bytes.NewBuffer(serialized) - req := osapi.IndexReq{ - Index: c.indexTemplatePattern + ld.Type, - Body: reader, - Header: c.singleHeader, - } - logger.Println("LogSend", req) + go func(serialized []byte) { + sending := atomic.AddInt32(&c.sendingCount, 1) + atomic.AddInt32(&c.sendingCount, -1) - resp, err := c.Do(context.Background(), req, nil) - logger.Println(resp) - if err != nil { - return err - } - defer resp.Body.Close() + if sending > 100 { + logger.Println("sending log bottleneck :", sending) + logger.Println(string(serialized)) + return + } - r, err2 := io.ReadAll(resp.Body) - if err2 != nil { - logger.Println("LogSend resp read error :", err2) - } else { - logger.Println("LogSend resp :", string(r)) - } + reader := bytes.NewBuffer(serialized) + req := osapi.IndexReq{ + Index: c.indexTemplatePattern + ld.Type, + Body: reader, + Header: c.singleHeader, + } - return nil + resp, err := c.Do(context.Background(), req, nil) + if err != nil { + logger.Println("log send failed :", err) + return + } + resp.Body.Close() + }(serialized) } -func (c *Client) SendBulk(ds map[string]*LogDocument) error { +func (c *Client) SendBulk(ds map[string]*LogDocument) { + if c == nil { + return + } + var contents string for _, d := range ds { @@ -94,19 +100,28 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) error { contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b)) } - reader := bytes.NewBuffer([]byte(contents)) - req := osapi.BulkReq{ - Body: reader, - Header: c.bulkHeader, - } - resp, err := c.Do(context.Background(), req, nil) - if err != nil { - return err - } - logger.Println(resp) - defer resp.Body.Close() + go func(contents string) { + sending := atomic.AddInt32(&c.sendingCount, 1) + atomic.AddInt32(&c.sendingCount, -1) - return nil + if sending > 100 { + logger.Println("sending log bottleneck :", sending) + logger.Println(contents) + return + } + + reader := bytes.NewBuffer([]byte(contents)) + req := osapi.BulkReq{ + Body: reader, + Header: c.bulkHeader, + } + resp, err := c.Do(context.Background(), req, nil) + if err != nil { + logger.Println("log send bulk failed :", err) + return + } + resp.Body.Close() + }(contents) } var jwtHeader string