diff --git a/opensearch/client.go b/opensearch/client.go index 910775a..c2ba28b 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -32,7 +32,6 @@ type Client struct { indexTemplatePattern string bulkHeader http.Header singleHeader http.Header - sendingCount int32 bulkChan chan *LogDocument } @@ -76,28 +75,40 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) { } type stringSliceReader struct { - src [][]byte + src [][]byte + sent [][]byte } func (b *stringSliceReader) Read(p []byte) (n int, err error) { + n = 0 + err = nil + for len(b.src) > 0 { sbt := b.src[0] - if len(p) < len(sbt) { - b.src[0] = sbt[len(p):] - sbt = sbt[:len(p)] - copy(p, sbt) - return n + len(sbt), nil + copied := copy(p, sbt) + n += copied + b.sent = append(b.sent, sbt[:copied]) + + if copied < len(sbt) { + b.src[0] = sbt[copied:] + return } - copy(p, sbt) - n += len(sbt) - p = p[len(sbt):] + p = p[copied:] b.src = b.src[1:] } + err = io.EOF return } +func (b *stringSliceReader) printSent() { + for _, r := range b.sent { + fmt.Print(string(r)) + } + fmt.Print("\n") +} + func (c *Client) sendLoop(ctx context.Context) { defer func() { r := recover() @@ -145,11 +156,9 @@ func (c *Client) sendLoop(ctx context.Context) { resp, err := c.Do(context.Background(), req, nil) if err != nil { if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { - // 접속 안됨. 파일로 남기고 끝 + // 접속 안됨. 재시도 안함 logger.Println("log send bulk failed. no retry :", err) - for _, e := range sending { - logger.Println(string(e)) - } + reader.printSent() } else { // 재시도 logger.Println("log send bulk failed. retry :", err)