From c73ffda016330dba5af3476909f9c68cd9912026 Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 10 Sep 2025 16:05:54 +0900 Subject: [PATCH] =?UTF-8?q?=EB=B2=84=ED=8D=BC=20=EC=A4=91=EB=B3=B5=20?= =?UTF-8?q?=EB=B3=B5=EC=82=AC=20=EB=B0=A9=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- opensearch/client.go | 128 ++++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 43 deletions(-) diff --git a/opensearch/client.go b/opensearch/client.go index 4a4c041..8d3534c 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -33,6 +33,10 @@ type Client struct { bulkHeader http.Header singleHeader http.Header bulkChan chan *LogDocument + singleLogPrepend []byte + singleLogMidpend []byte + singleLogAppend []byte + singleLogFixedSize int } type LogDocument struct { @@ -74,28 +78,60 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) { } } +type singleLogMarshaller struct { + singleLogPrepend []byte + singleLogMidpend []byte + singleLogAppend []byte + + logtype []byte + content []byte + length int +} + type stringSliceReader struct { - src [][]byte - sent [][]byte + src []*singleLogMarshaller + cursor int } func (b *stringSliceReader) Read(p []byte) (n int, err error) { n = 0 err = nil - for len(b.src) > 0 { - sbt := b.src[0] - copied := copy(p, sbt) - n += copied - b.sent = append(b.sent, sbt[:copied]) + advance := func(in []byte) []byte { + if len(in) == 0 { + return in + } - if copied < len(sbt) { - b.src[0] = sbt[copied:] + copied := copy(p, in) + p = p[copied:] + n += copied + return in[copied:] + } + + for b.cursor < len(b.src) { + sbt := b.src[b.cursor] + + if sbt.singleLogPrepend = advance(sbt.singleLogPrepend); len(sbt.singleLogPrepend) > 0 { return } - p = p[copied:] - b.src = b.src[1:] + if sbt.logtype = advance(sbt.logtype); len(sbt.logtype) > 0 { + return + } + + if sbt.singleLogMidpend = advance(sbt.singleLogMidpend); len(sbt.singleLogMidpend) > 0 { + return + } + + if sbt.content = advance(sbt.content); len(sbt.content) > 0 { + return + } + + if sbt.singleLogAppend = advance(sbt.singleLogAppend); len(sbt.singleLogAppend) > 0 { + return + } + + b.cursor++ } err = io.EOF @@ -103,8 +139,8 @@ func (b *stringSliceReader) Read(p []byte) (n int, err error) { } func (b *stringSliceReader) printSent() { - for _, r := range b.sent { - fmt.Print(string(r)) + for _, r := range b.src { + fmt.Print(string(r.content)) } fmt.Print("\n") } @@ -117,9 +153,8 @@ func (c *Client) sendLoop(ctx context.Context) { } }() - failChan := make(chan [][]byte) - contentsSize := 0 - var contents [][]byte + failChan := make(chan []*singleLogMarshaller) + var logMarshallers []*singleLogMarshaller sendTick := time.After(time.Minute) sendfunc := func() { @@ -127,19 +162,18 @@ func (c *Client) sendLoop(ctx context.Context) { // 실패한 로그가 다시 되돌아 오면 contents가 커질 수 있다. sendingSize := 0 cut := 0 - for ; cut < len(contents); cut++ { - thisSize := len(contents[cut]) - if thisSize+sendingSize > 2*1024*1024 { + for ; cut < len(logMarshallers); cut++ { + // 2메가가 넘더라도 최소한 하나는 보내자. + if cut > 0 && sendingSize+logMarshallers[cut].length > 2*1024*1024 { break } - sendingSize += thisSize + sendingSize += logMarshallers[cut].length } - sending := contents[:cut] - contents = contents[cut:] - contentsSize -= sendingSize + sending := logMarshallers[:cut] + logMarshallers = logMarshallers[cut:] sendTick = time.After(time.Minute) - go func(sending [][]byte) { + go func(sending []*singleLogMarshaller) { defer func() { r := recover() if r != nil { @@ -147,14 +181,12 @@ func (c *Client) sendLoop(ctx context.Context) { } }() - reader := &stringSliceReader{src: sending} + reader := &stringSliceReader{src: sending, cursor: 0} req := osapi.BulkReq{ Body: reader, Header: c.bulkHeader, } - resp, err := c.Do(context.Background(), req, nil) - logger.Println("[LogStream] process BulkReq :", err) if err != nil { if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { @@ -169,7 +201,6 @@ func (c *Client) sendLoop(ctx context.Context) { return } if resp.Body == nil { - logger.Println("[LogStream] send bulk failed. empty response") return } defer resp.Body.Close() @@ -188,11 +219,10 @@ func (c *Client) sendLoop(ctx context.Context) { } if !respbody.Errors { - logger.Println("[LogStream] process BulkReq success", respbody) return } - var retry [][]byte + var retry []*singleLogMarshaller for i, item := range respbody.Items { if item.Create.Status < 400 { // 재시도 @@ -214,14 +244,11 @@ func (c *Client) sendLoop(ctx context.Context) { case ret := <-failChan: // 순서는 중요하지 않음. - for _, r := range ret { - contentsSize += len(r) - } - contents = append(ret, contents...) + logMarshallers = append(logMarshallers, ret...) sendfunc() case <-sendTick: - if len(contents) > 0 { + if len(logMarshallers) > 0 { sendfunc() } else { sendTick = time.After(time.Minute) @@ -229,12 +256,15 @@ func (c *Client) sendLoop(ctx context.Context) { case logDoc := <-c.bulkChan: b, _ := json.Marshal(logDoc) - e := fmt.Appendf(nil, `{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, logDoc.Type, string(b)) - contentsSize += len(e) - contents = append(contents, e) - if contentsSize > 1024*1024 { - sendfunc() - } + logtype := []byte(logDoc.Type) + logMarshallers = append(logMarshallers, &singleLogMarshaller{ + singleLogPrepend: c.singleLogPrepend, + singleLogMidpend: c.singleLogMidpend, + singleLogAppend: c.singleLogAppend, + logtype: logtype, + content: b, + length: len(logtype) + len(b) + c.singleLogFixedSize, + }) } } } @@ -256,7 +286,7 @@ func (c *Client) MakeJWT(subject string, role string, ttl time.Duration) string } now := time.Now().Add(ttl).Unix() - src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role)) + src := fmt.Appendf(nil, `{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role) payload := make([]byte, encoding.EncodedLen(len(src))) encoding.Encode(payload, src) @@ -318,6 +348,8 @@ func NewClient(ctx context.Context, cfg Config) (Client, error) { return Client{}, nil } + // retry는 수동으로 + cfg.Config.DisableRetry = true client, err := osg.NewClient(cfg.Config) if err != nil { return Client{}, err @@ -342,11 +374,16 @@ func NewClient(ctx context.Context, cfg Config) (Client, error) { bulkHeader := make(http.Header) singleHeader := make(http.Header) if len(cfg.Username) > 0 && len(cfg.Password) > 0 { - authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password)))) + authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", cfg.Username, cfg.Password))) bulkHeader.Set("Authorization", authHeader) singleHeader.Set("Authorization", authHeader) } + singleLogPrepend := fmt.Appendf(nil, `{"create":{"_index":"%s`, indexPrefix) + singleLogMidpend := []byte("\"}}\n") + singleLogAppend := []byte("\n") + singleLogFixedSize := len(singleLogPrepend) + len(singleLogMidpend) + len(singleLogAppend) + out := Client{ Client: client, cfg: cfg, @@ -355,6 +392,11 @@ func NewClient(ctx context.Context, cfg Config) (Client, error) { bulkHeader: bulkHeader, singleHeader: singleHeader, bulkChan: make(chan *LogDocument, 1000), + + singleLogPrepend: singleLogPrepend, + singleLogMidpend: singleLogMidpend, + singleLogAppend: singleLogAppend, + singleLogFixedSize: singleLogFixedSize, } go func() {