로그 전송을 비동기로

This commit is contained in:
2025-08-05 21:40:37 +09:00
parent e4e0d49ace
commit fb3f038506

View File

@ -8,9 +8,9 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
"strings" "strings"
"sync/atomic"
"time" "time"
osg "github.com/opensearch-project/opensearch-go/v4" osg "github.com/opensearch-project/opensearch-go/v4"
@ -31,6 +31,7 @@ type Client struct {
indexTemplatePattern string indexTemplatePattern string
bulkHeader http.Header bulkHeader http.Header
singleHeader http.Header singleHeader http.Header
sendingCount int32
} }
type LogDocument struct { type LogDocument struct {
@ -55,38 +56,43 @@ func NewLogDocument(logType string, body any) *LogDocument {
} }
} }
func (c *Client) Send(ld *LogDocument) error { func (c *Client) Send(ld *LogDocument) {
if c.Client == nil { if c.Client == nil {
return nil return
} }
serialized, _ := json.Marshal(ld) serialized, _ := json.Marshal(ld)
go func(serialized []byte) {
sending := atomic.AddInt32(&c.sendingCount, 1)
atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(string(serialized))
return
}
reader := bytes.NewBuffer(serialized) reader := bytes.NewBuffer(serialized)
req := osapi.IndexReq{ req := osapi.IndexReq{
Index: c.indexTemplatePattern + ld.Type, Index: c.indexTemplatePattern + ld.Type,
Body: reader, Body: reader,
Header: c.singleHeader, Header: c.singleHeader,
} }
logger.Println("LogSend", req)
resp, err := c.Do(context.Background(), req, nil) resp, err := c.Do(context.Background(), req, nil)
logger.Println(resp)
if err != nil { if err != nil {
return err logger.Println("log send failed :", err)
return
} }
defer resp.Body.Close() resp.Body.Close()
}(serialized)
r, err2 := io.ReadAll(resp.Body)
if err2 != nil {
logger.Println("LogSend resp read error :", err2)
} else {
logger.Println("LogSend resp :", string(r))
}
return nil
} }
func (c *Client) SendBulk(ds map[string]*LogDocument) error { func (c *Client) SendBulk(ds map[string]*LogDocument) {
if c == nil {
return
}
var contents string var contents string
for _, d := range ds { for _, d := range ds {
@ -94,6 +100,16 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) error {
contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b)) contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b))
} }
go func(contents string) {
sending := atomic.AddInt32(&c.sendingCount, 1)
atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(contents)
return
}
reader := bytes.NewBuffer([]byte(contents)) reader := bytes.NewBuffer([]byte(contents))
req := osapi.BulkReq{ req := osapi.BulkReq{
Body: reader, Body: reader,
@ -101,12 +117,11 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) error {
} }
resp, err := c.Do(context.Background(), req, nil) resp, err := c.Do(context.Background(), req, nil)
if err != nil { if err != nil {
return err logger.Println("log send bulk failed :", err)
return
} }
logger.Println(resp) resp.Body.Close()
defer resp.Body.Close() }(contents)
return nil
} }
var jwtHeader string var jwtHeader string