From e06828dce49266b1f515bc5a328d483ab2e74d56 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 11 Sep 2025 17:56:18 +0900 Subject: [PATCH] =?UTF-8?q?logstream=20=EB=A1=9C=EA=B7=B8=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- opensearch/client.go | 166 ++++++++++++++++++++++++------------------- 1 file changed, 93 insertions(+), 73 deletions(-) diff --git a/opensearch/client.go b/opensearch/client.go index 8d3534c..e7db124 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -19,6 +19,8 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" ) +const logbulksize = 512 * 1024 + type Config struct { osg.Config `json:",inline"` IndexPrefix string `json:"IndexPrefix"` @@ -157,84 +159,91 @@ func (c *Client) sendLoop(ctx context.Context) { var logMarshallers []*singleLogMarshaller sendTick := time.After(time.Minute) - sendfunc := func() { - // 2mb가 넘지 않게 조절. - // 실패한 로그가 다시 되돌아 오면 contents가 커질 수 있다. - sendingSize := 0 - cut := 0 - for ; cut < len(logMarshallers); cut++ { - // 2메가가 넘더라도 최소한 하나는 보내자. - if cut > 0 && sendingSize+logMarshallers[cut].length > 2*1024*1024 { - break - } - sendingSize += logMarshallers[cut].length + sendfunc := func(logs []*singleLogMarshaller) { + if len(logs) == 0 { + return } - sending := logMarshallers[:cut] - logMarshallers = logMarshallers[cut:] - sendTick = time.After(time.Minute) - go func(sending []*singleLogMarshaller) { - defer func() { - r := recover() - if r != nil { - logger.Println(r) - } - }() + defer func() { + r := recover() + if r != nil { + logger.Println(r) + } + }() - reader := &stringSliceReader{src: sending, cursor: 0} - req := osapi.BulkReq{ - Body: reader, - Header: c.bulkHeader, - } - resp, err := c.Do(context.Background(), req, nil) + reader := &stringSliceReader{src: logs, cursor: 0} + req := osapi.BulkReq{ + Body: reader, + Header: c.bulkHeader, + } + resp, err := c.Do(context.Background(), req, nil) - if err != nil { - if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { - // 접속 안됨. 재시도 안함 - logger.Println("[LogStream] send bulk failed. no retry :", err) - reader.printSent() - } else { - // 재시도 - logger.Println("[LogStream] send bulk failed. retry :", err) - failChan <- sending - } - return + if err != nil { + if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { + // 접속 안됨. 재시도 안함 + logger.Println("[LogStream] send bulk failed. no retry :", err) + reader.printSent() + } else { + // 재시도 + logger.Println("[LogStream] send bulk failed. retry :", err) + failChan <- logs } - if resp.Body == nil { - return - } - defer resp.Body.Close() + return + } + if resp.Body == nil { + return + } + defer resp.Body.Close() - var respbody struct { - Errors bool `json:"errors"` - Items []struct { - Create struct { - Status int `json:"status"` - } `json:"create"` - } `json:"items"` - } - if err := json.NewDecoder(resp.Body).Decode(&respbody); err != nil { - logger.Println("[LogStream] decode response body failed :", err) - return - } + var respbody struct { + Errors bool `json:"errors"` + Items []struct { + Create struct { + Status int `json:"status"` + } `json:"create"` + } `json:"items"` + } - if !respbody.Errors { - return - } + decoder := json.NewDecoder(resp.Body) + if err := decoder.Decode(&respbody); err != nil { + errbody, _ := io.ReadAll(decoder.Buffered()) + logger.Println("[LogStream] decode response body failed and retry :", err, string(errbody), len(logs)) + // 전체 재시도 필요 + failChan <- logs + return + } - var retry []*singleLogMarshaller - for i, item := range respbody.Items { - if item.Create.Status < 400 { - // 재시도 - retry = append(retry, sending[i]) - } - } + if !respbody.Errors { + // 성공 + return + } - logger.Println("[LogStream] send bulk failed. retry :", len(retry)) - if len(retry) > 0 { - failChan <- retry + var retry []*singleLogMarshaller + for i, item := range respbody.Items { + if item.Create.Status < 400 { + // 재시도 + retry = append(retry, logs[i]) } - }(sending) + } + + logger.Println("[LogStream] send bulk failed. retry :", len(retry), len(logs), respbody.Items) + if len(retry) > 0 { + failChan <- retry + } + } + + totalsize := 0 + appendLog := func(newlog *singleLogMarshaller) bool { + if totalsize+newlog.length > logbulksize { + go sendfunc(logMarshallers) + totalsize = newlog.length + logMarshallers = []*singleLogMarshaller{newlog} + return true + } + + totalsize += newlog.length + logMarshallers = append(logMarshallers, newlog) + return false } for { @@ -244,12 +253,20 @@ func (c *Client) sendLoop(ctx context.Context) { case ret := <-failChan: // 순서는 중요하지 않음. - logMarshallers = append(logMarshallers, ret...) - sendfunc() + sent := false + for _, newlog := range ret { + sent = sent || appendLog(newlog) + } + + if sent { + sendTick = time.After(time.Minute) + } case <-sendTick: if len(logMarshallers) > 0 { - sendfunc() + go sendfunc(logMarshallers) + totalsize = 0 + logMarshallers = nil } else { sendTick = time.After(time.Minute) } @@ -257,14 +274,17 @@ func (c *Client) sendLoop(ctx context.Context) { case logDoc := <-c.bulkChan: b, _ := json.Marshal(logDoc) logtype := []byte(logDoc.Type) - logMarshallers = append(logMarshallers, &singleLogMarshaller{ + + if appendLog(&singleLogMarshaller{ singleLogPrepend: c.singleLogPrepend, singleLogMidpend: c.singleLogMidpend, singleLogAppend: c.singleLogAppend, logtype: logtype, content: b, length: len(logtype) + len(b) + c.singleLogFixedSize, - }) + }) { + sendTick = time.After(time.Minute) + } } } }