package opensearch import ( "context" "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" "io" "net" "net/http" "slices" "strings" "time" osg "github.com/opensearch-project/opensearch-go/v4" osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "repositories.action2quare.com/ayo/gocommon/logger" ) const logbulksize = 512 * 1024 type Config struct { osg.Config `json:",inline"` IndexPrefix string `json:"IndexPrefix"` SigningKey string `json:"SigningKey"` } type Client struct { *osg.Client cfg Config signingKey []byte indexTemplatePattern string bulkHeader http.Header singleHeader http.Header bulkChan chan *LogDocument singleLogPrepend []byte singleLogMidpend []byte singleLogAppend []byte singleLogFixedSize int } type LogDocument struct { Type string `json:"type"` Body any `json:"body"` Timestamp string `json:"@timestamp"` Country string `json:"country"` Ip string `json:"ip"` Uid string `json:"uid"` Auth struct { Type string `json:"type"` Id string `json:"id"` } `json:"auth"` } func NewLogDocument(logType string, body any) *LogDocument { return &LogDocument{ Type: strings.ToLower(logType), Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"), Body: body, } } func (c *Client) Send(ld *LogDocument) { if c.Client == nil { return } c.bulkChan <- ld } func (c *Client) SendBulk(ds map[string]*LogDocument) { if c == nil { return } for _, d := range ds { c.bulkChan <- d } } type singleLogMarshaller struct { singleLogPrepend []byte singleLogMidpend []byte singleLogAppend []byte logtype []byte content []byte length int } type logSliceReader struct { src []*singleLogMarshaller cursor int } func newLogSliceReader(in []singleLogMarshaller) *logSliceReader { src := make([]*singleLogMarshaller, len(in)) for i, v := range in { copylog := new(singleLogMarshaller) *copylog = v src[i] = copylog } return &logSliceReader{ src: src, cursor: 0, } } func (b *logSliceReader) Read(p []byte) (n int, err error) { n = 0 err = nil advance := func(in []byte) []byte { if len(in) == 0 { return in } copied := copy(p, in) p = p[copied:] n += copied return in[copied:] } for b.cursor < len(b.src) { sbt := b.src[b.cursor] if sbt.singleLogPrepend = advance(sbt.singleLogPrepend); len(sbt.singleLogPrepend) > 0 { return } if sbt.logtype = advance(sbt.logtype); len(sbt.logtype) > 0 { return } if sbt.singleLogMidpend = advance(sbt.singleLogMidpend); len(sbt.singleLogMidpend) > 0 { return } if sbt.content = advance(sbt.content); len(sbt.content) > 0 { return } if sbt.singleLogAppend = advance(sbt.singleLogAppend); len(sbt.singleLogAppend) > 0 { return } b.cursor++ } err = io.EOF return } func (b *logSliceReader) printSent() { for _, r := range b.src { fmt.Print(string(r.content)) } fmt.Print("\n") } func (c *Client) sendLoop(ctx context.Context) { defer func() { r := recover() if r != nil { logger.Error(r) } }() failChan := make(chan []singleLogMarshaller) var logMarshallers []singleLogMarshaller sendTick := time.After(time.Minute) sendfunc := func(logs []singleLogMarshaller) { if len(logs) == 0 { return } defer func() { r := recover() if r != nil { logger.Println(r) } }() reader := newLogSliceReader(logs) req := osapi.BulkReq{ Body: reader, Header: c.bulkHeader, } resp, err := c.Do(context.Background(), req, nil) if err != nil { if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { // 접속 안됨. 재시도 안함 logger.Println("[LogStream] send bulk failed. no retry :", err) reader.printSent() } else { // 재시도 logger.Println("[LogStream] send bulk failed. retry :", err) failChan <- logs } return } if resp.Body == nil { return } defer resp.Body.Close() var respbody struct { Errors bool `json:"errors"` Items []struct { Create struct { Status int `json:"status"` } `json:"create"` } `json:"items"` } decoder := json.NewDecoder(resp.Body) if err := decoder.Decode(&respbody); err != nil { errbody, _ := io.ReadAll(decoder.Buffered()) logger.Println("[LogStream] decode response body failed and retry :", err, string(errbody), len(logs)) // 전체 재시도 필요 failChan <- logs return } if !respbody.Errors { // 성공 return } var retry []singleLogMarshaller for i, item := range respbody.Items { if item.Create.Status < 300 { continue } if item.Create.Status == 429 || item.Create.Status >= 500 { logger.Println("[LogStream] send bulk failed but retry. status :", item.Create.Status) retry = append(retry, logs[i]) } else if item.Create.Status == 400 { // 구문 오류. 재시도 불가 if i < len(logs) { logger.Println("[LogStream] send bulk failed. status 400 :", string(logs[i].content)) } else { logger.Println("[LogStream] send bulk failed. status 400 but out of index :", i, len(logs)) } } else { // 일단 로그만 logger.Println("[LogStream] send bulk failed but no retry. status :", item.Create.Status) } } if len(retry) > 0 { failChan <- retry } } totalsize := 0 appendLog := func(newlog singleLogMarshaller) bool { if totalsize+newlog.length > logbulksize { go sendfunc(logMarshallers) totalsize = newlog.length logMarshallers = []singleLogMarshaller{newlog} return true } totalsize += newlog.length logMarshallers = append(logMarshallers, newlog) return false } for { select { case <-ctx.Done(): return case ret := <-failChan: // 순서는 중요하지 않음. sent := false for _, newlog := range ret { sent = sent || appendLog(newlog) } if sent { sendTick = time.After(time.Minute) } case <-sendTick: if len(logMarshallers) > 0 { go sendfunc(logMarshallers) totalsize = 0 logMarshallers = nil } else { sendTick = time.After(time.Minute) } case logDoc := <-c.bulkChan: b, _ := json.Marshal(logDoc) logtype := []byte(logDoc.Type) if appendLog(singleLogMarshaller{ singleLogPrepend: c.singleLogPrepend, singleLogMidpend: c.singleLogMidpend, singleLogAppend: c.singleLogAppend, logtype: logtype, content: b, length: len(logtype) + len(b) + c.singleLogFixedSize, }) { sendTick = time.After(time.Minute) } } } } var jwtHeader string var encoding = base64.RawURLEncoding func init() { src := []byte(`{"alg": "HS256","typ": "JWT"}`) dst := make([]byte, len(src)*2) encoding.Encode(dst, src) enclen := encoding.EncodedLen(len(src)) jwtHeader = string(dst[:enclen]) } func (c *Client) MakeJWT(subject string, role string, ttl time.Duration) string { if len(c.signingKey) == 0 { return "" } now := time.Now().Add(ttl).Unix() src := fmt.Appendf(nil, `{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role) payload := make([]byte, encoding.EncodedLen(len(src))) encoding.Encode(payload, src) encoded := jwtHeader + "." + string(payload) mac := hmac.New(sha256.New, c.signingKey) mac.Write([]byte(encoded)) signature := mac.Sum(nil) sigenc := make([]byte, encoding.EncodedLen(len(signature))) encoding.Encode(sigenc, signature) return encoded + "." + string(sigenc) } func (c *Client) VerifyJWT(token string) (subject string, role string) { dot := strings.LastIndex(token, ".") if dot < 0 { return } encoded := token[:dot] sigenc := token[dot+1:] signature := make([]byte, encoding.DecodedLen(len(sigenc))) encoding.Decode(signature, []byte(sigenc)) mac := hmac.New(sha256.New, c.signingKey) mac.Write([]byte(encoded)) calsig := mac.Sum(nil) if slices.Compare(calsig, signature) != 0 { return } _, payload, ok := strings.Cut(encoded, ".") if !ok { return } srcjson, err := encoding.DecodeString(payload) if err != nil { return } var src struct { Exp int64 `json:"exp"` Sub string `json:"sub"` Roles string `json:"roles"` } if json.Unmarshal([]byte(srcjson), &src) != nil { return } if src.Exp < time.Now().Unix() { return } return src.Sub, src.Roles } func NewClient(ctx context.Context, cfg Config) (Client, error) { if len(cfg.Addresses) == 0 { return Client{}, nil } // retry는 수동으로 cfg.Config.DisableRetry = true client, err := osg.NewClient(cfg.Config) if err != nil { return Client{}, err } var signingKey []byte if len(cfg.SigningKey) > 0 { dst := make([]byte, len(cfg.SigningKey)*2) dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey)) signingKey = dst[:dstlen] } indexPrefix := cfg.IndexPrefix if !strings.HasSuffix(indexPrefix, "-") && len(indexPrefix) > 0 { indexPrefix += "-" } if !strings.HasSuffix(indexPrefix, "ds-logs-") { indexPrefix = "ds-logs-" + indexPrefix } logger.Println("[LogStream] stream indexPrefix :", indexPrefix) bulkHeader := make(http.Header) singleHeader := make(http.Header) if len(cfg.Username) > 0 && len(cfg.Password) > 0 { authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", cfg.Username, cfg.Password))) bulkHeader.Set("Authorization", authHeader) singleHeader.Set("Authorization", authHeader) } singleLogPrepend := fmt.Appendf(nil, `{"create":{"_index":"%s`, indexPrefix) singleLogMidpend := []byte("\"}}\n") singleLogAppend := []byte("\n") singleLogFixedSize := len(singleLogPrepend) + len(singleLogMidpend) + len(singleLogAppend) out := Client{ Client: client, cfg: cfg, signingKey: signingKey, indexTemplatePattern: indexPrefix, bulkHeader: bulkHeader, singleHeader: singleHeader, bulkChan: make(chan *LogDocument, 1000), singleLogPrepend: singleLogPrepend, singleLogMidpend: singleLogMidpend, singleLogAppend: singleLogAppend, singleLogFixedSize: singleLogFixedSize, } go func() { for { out.sendLoop(ctx) if ctx.Err() != nil { return } } }() return out, nil }