opensearch client 로그 전송 실패 처리

This commit is contained in:
2025-09-03 17:20:34 +09:00
parent 626819209f
commit 2681c7313b

View File

@ -1,17 +1,17 @@
package opensearch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"slices"
"strings"
"sync/atomic"
"time"
osg "github.com/opensearch-project/opensearch-go/v4"
@ -33,6 +33,7 @@ type Client struct {
bulkHeader http.Header
singleHeader http.Header
sendingCount int32
bulkChan chan *LogDocument
}
type LogDocument struct {
@ -62,67 +63,157 @@ func (c *Client) Send(ld *LogDocument) {
return
}
serialized, _ := json.Marshal(ld)
go func(serialized []byte) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(string(serialized))
return
}
reader := bytes.NewBuffer(serialized)
req := osapi.IndexReq{
Index: c.indexTemplatePattern + ld.Type,
Body: reader,
Header: c.singleHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send failed :", err)
return
}
resp.Body.Close()
}(serialized)
c.bulkChan <- ld
}
func (c *Client) SendBulk(ds map[string]*LogDocument) {
if c == nil {
return
}
var contents string
for _, d := range ds {
b, _ := json.Marshal(d)
contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b))
c.bulkChan <- d
}
}
type stringSliceReader struct {
src [][]byte
pointer int
}
func (b *stringSliceReader) Read(p []byte) (n int, err error) {
for b.pointer < len(b.src) {
sbt := b.src[b.pointer]
if len(p) < len(sbt) {
return
}
copy(p, sbt)
p = p[len(sbt):]
b.pointer++
err = nil
}
err = io.EOF
return
}
func (c *Client) sendLoop(ctx context.Context) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
failChan := make(chan [][]byte)
contentsSize := 0
var contents [][]byte
sendTick := time.After(time.Minute)
sendfunc := func() {
// 2mb가 넘지 않게 조절.
// 실패한 로그가 다시 되돌아 오면 contents가 커질 수 있다.
sendingSize := 0
cut := 0
for ; cut < len(contents); cut++ {
thisSize := len(contents[cut])
if thisSize+sendingSize > 2*1024*1024 {
break
}
sendingSize += thisSize
}
sending := contents[:cut]
contents = contents[cut:]
contentsSize -= sendingSize
sendTick = time.After(time.Minute)
go func(sending [][]byte) {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
}
}()
reader := &stringSliceReader{src: sending, pointer: 0}
req := osapi.BulkReq{
Body: reader,
Header: c.bulkHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" {
// 접속 안됨. 파일로 남기고 끝
logger.Println("log send bulk failed :", err)
for _, e := range sending {
logger.Println(string(e))
}
} else {
// 재시도
failChan <- sending
}
return
}
if resp.Body == nil {
return
}
defer resp.Body.Close()
var respbody struct {
Errors bool `json:"errors"`
Items []struct {
Create struct {
Status int `json:"status"`
} `json:"create"`
} `json:"items"`
}
json.NewDecoder(resp.Body).Decode(&respbody)
if !respbody.Errors {
return
}
var retry [][]byte
for i, item := range respbody.Items {
if item.Create.Status < 400 {
// 재시도
retry = append(retry, sending[i])
}
}
if len(retry) > 0 {
failChan <- retry
}
}(sending)
}
go func(contents string) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(contents)
for {
select {
case <-ctx.Done():
return
}
reader := bytes.NewBuffer([]byte(contents))
req := osapi.BulkReq{
Body: reader,
Header: c.bulkHeader,
case ret := <-failChan:
// 순서는 중요하지 않음.
for _, r := range ret {
contentsSize += len(r)
}
contents = append(ret, contents...)
sendfunc()
case <-sendTick:
if len(contents) > 0 {
sendfunc()
} else {
sendTick = time.After(time.Minute)
}
case logDoc := <-c.bulkChan:
b, _ := json.Marshal(logDoc)
e := fmt.Appendf(nil, `{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, logDoc.Type, string(b))
contentsSize += len(e)
contents = append(contents, e)
if contentsSize > 1024*1024 {
sendfunc()
}
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send bulk failed :", err)
return
}
resp.Body.Close()
}(contents)
}
}
var jwtHeader string
@ -199,7 +290,7 @@ func (c *Client) VerifyJWT(token string) (subject string, role string) {
return src.Sub, src.Roles
}
func NewClient(cfg Config) (Client, error) {
func NewClient(ctx context.Context, cfg Config) (Client, error) {
if len(cfg.Addresses) == 0 {
return Client{}, nil
}
@ -232,12 +323,23 @@ func NewClient(cfg Config) (Client, error) {
singleHeader := make(http.Header)
singleHeader.Set("Authorization", authHeader)
return Client{
out := Client{
Client: client,
cfg: cfg,
signingKey: signingKey,
indexTemplatePattern: indexPrefix,
bulkHeader: bulkHeader,
singleHeader: singleHeader,
}, nil
bulkChan: make(chan *LogDocument, 1000),
}
go func() {
for {
out.sendLoop(ctx)
if ctx.Err() != nil {
return
}
}
}()
return out, nil
}