From 289594716c751793f1c9d38f8866ce69ea588913 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 11 Sep 2025 19:59:43 +0900 Subject: [PATCH] =?UTF-8?q?=EC=8B=A4=ED=8C=A8=20=EB=A1=9C=EA=B7=B8=20?= =?UTF-8?q?=EC=9E=AC=EC=A0=84=EC=86=A1=20=EB=A1=9C=EC=A7=81=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- opensearch/client.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/opensearch/client.go b/opensearch/client.go index fa29ea9..163d6b0 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -90,12 +90,26 @@ type singleLogMarshaller struct { length int } -type stringSliceReader struct { +type logSliceReader struct { src []*singleLogMarshaller cursor int } -func (b *stringSliceReader) Read(p []byte) (n int, err error) { +func newLogSliceReader(in []singleLogMarshaller) *logSliceReader { + src := make([]*singleLogMarshaller, len(in)) + for i, v := range in { + copylog := new(singleLogMarshaller) + *copylog = v + src[i] = copylog + } + + return &logSliceReader{ + src: src, + cursor: 0, + } +} + +func (b *logSliceReader) Read(p []byte) (n int, err error) { n = 0 err = nil @@ -140,7 +154,7 @@ func (b *stringSliceReader) Read(p []byte) (n int, err error) { return } -func (b *stringSliceReader) printSent() { +func (b *logSliceReader) printSent() { for _, r := range b.src { fmt.Print(string(r.content)) } @@ -155,11 +169,11 @@ func (c *Client) sendLoop(ctx context.Context) { } }() - failChan := make(chan []*singleLogMarshaller) - var logMarshallers []*singleLogMarshaller + failChan := make(chan []singleLogMarshaller) + var logMarshallers []singleLogMarshaller sendTick := time.After(time.Minute) - sendfunc := func(logs []*singleLogMarshaller) { + sendfunc := func(logs []singleLogMarshaller) { if len(logs) == 0 { return } @@ -171,7 +185,7 @@ func (c *Client) sendLoop(ctx context.Context) { } }() - reader := &stringSliceReader{src: logs, cursor: 0} + reader := newLogSliceReader(logs) req := osapi.BulkReq{ Body: reader, Header: c.bulkHeader, @@ -218,7 +232,7 @@ func (c *Client) sendLoop(ctx context.Context) { return } - var retry []*singleLogMarshaller + var retry []singleLogMarshaller for i, item := range respbody.Items { if item.Create.Status < 300 { continue @@ -246,11 +260,11 @@ func (c *Client) sendLoop(ctx context.Context) { } totalsize := 0 - appendLog := func(newlog *singleLogMarshaller) bool { + appendLog := func(newlog singleLogMarshaller) bool { if totalsize+newlog.length > logbulksize { go sendfunc(logMarshallers) totalsize = newlog.length - logMarshallers = []*singleLogMarshaller{newlog} + logMarshallers = []singleLogMarshaller{newlog} return true } @@ -288,7 +302,7 @@ func (c *Client) sendLoop(ctx context.Context) { b, _ := json.Marshal(logDoc) logtype := []byte(logDoc.Type) - if appendLog(&singleLogMarshaller{ + if appendLog(singleLogMarshaller{ singleLogPrepend: c.singleLogPrepend, singleLogMidpend: c.singleLogMidpend, singleLogAppend: c.singleLogAppend,