Files
gocommon/opensearch/client.go
2025-08-13 22:06:06 +09:00

244 lines
5.5 KiB
Go

package opensearch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"slices"
"strings"
"sync/atomic"
"time"
osg "github.com/opensearch-project/opensearch-go/v4"
osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type Config struct {
osg.Config `json:",inline"`
IndexPrefix string `json:"IndexPrefix"`
SigningKey string `json:"SigningKey"`
}
type Client struct {
*osg.Client
cfg Config
signingKey []byte
indexTemplatePattern string
bulkHeader http.Header
singleHeader http.Header
sendingCount int32
}
type LogDocument struct {
Type string `json:"type"`
Body any `json:"body"`
Timestamp string `json:"@timestamp"`
Country string `json:"country"`
Ip string `json:"ip"`
Uid string `json:"uid"`
Auth struct {
Type string `json:"type"`
Id string `json:"id"`
} `json:"auth"`
}
func NewLogDocument(logType string, body any) *LogDocument {
return &LogDocument{
Type: strings.ToLower(logType),
Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"),
Body: body,
}
}
func (c *Client) Send(ld *LogDocument) {
if c.Client == nil {
return
}
serialized, _ := json.Marshal(ld)
go func(serialized []byte) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(string(serialized))
return
}
reader := bytes.NewBuffer(serialized)
req := osapi.IndexReq{
Index: c.indexTemplatePattern + ld.Type,
Body: reader,
Header: c.singleHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send failed :", err)
return
}
resp.Body.Close()
}(serialized)
}
func (c *Client) SendBulk(ds map[string]*LogDocument) {
if c == nil {
return
}
var contents string
for _, d := range ds {
b, _ := json.Marshal(d)
contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b))
}
go func(contents string) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(contents)
return
}
reader := bytes.NewBuffer([]byte(contents))
req := osapi.BulkReq{
Body: reader,
Header: c.bulkHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send bulk failed :", err)
return
}
resp.Body.Close()
}(contents)
}
var jwtHeader string
var encoding = base64.RawURLEncoding
func init() {
src := []byte(`{"alg": "HS256","typ": "JWT"}`)
dst := make([]byte, len(src)*2)
encoding.Encode(dst, src)
enclen := encoding.EncodedLen(len(src))
jwtHeader = string(dst[:enclen])
}
func (c *Client) MakeJWT(subject string, role string, ttl time.Duration) string {
if len(c.signingKey) == 0 {
return ""
}
now := time.Now().UTC().Add(ttl).Add(time.Hour).Unix()
src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role))
payload := make([]byte, encoding.EncodedLen(len(src)))
encoding.Encode(payload, src)
encoded := jwtHeader + "." + string(payload)
mac := hmac.New(sha256.New, c.signingKey)
mac.Write([]byte(encoded))
signature := mac.Sum(nil)
sigenc := make([]byte, encoding.EncodedLen(len(signature)))
encoding.Encode(sigenc, signature)
return encoded + "." + string(sigenc)
}
func (c *Client) VerifyJWT(token string) (subject string, role string) {
dot := strings.LastIndex(token, ".")
if dot < 0 {
return
}
encoded := token[:dot]
sigenc := token[dot+1:]
signature := make([]byte, encoding.DecodedLen(len(sigenc)))
encoding.Decode(signature, []byte(sigenc))
mac := hmac.New(sha256.New, c.signingKey)
mac.Write([]byte(encoded))
calsig := mac.Sum(nil)
if slices.Compare(calsig, signature) != 0 {
return
}
_, payload, ok := strings.Cut(encoded, ".")
if !ok {
return
}
srcjson, err := encoding.DecodeString(payload)
if err != nil {
return
}
var src struct {
Exp int64 `json:"exp"`
Sub string `json:"sub"`
Roles string `json:"roles"`
}
if json.Unmarshal([]byte(srcjson), &src) != nil {
return
}
if src.Exp < time.Now().Unix() {
return
}
return src.Sub, src.Roles
}
func NewClient(cfg Config) (Client, error) {
if len(cfg.Addresses) == 0 {
return Client{}, nil
}
client, err := osg.NewClient(cfg.Config)
if err != nil {
return Client{}, err
}
var signingKey []byte
if len(cfg.SigningKey) > 0 {
dst := make([]byte, len(cfg.SigningKey)*2)
dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey))
signingKey = dst[:dstlen]
}
indexPrefix := cfg.IndexPrefix
if !strings.HasSuffix(indexPrefix, "-") && len(indexPrefix) > 0 {
indexPrefix += "-"
}
if !strings.HasSuffix(indexPrefix, "ds-logs-") {
indexPrefix = "ds-logs-" + indexPrefix
}
authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password))))
bulkHeader := make(http.Header)
bulkHeader.Set("Authorization", authHeader)
singleHeader := make(http.Header)
singleHeader.Set("Authorization", authHeader)
return Client{
Client: client,
cfg: cfg,
signingKey: signingKey,
indexTemplatePattern: indexPrefix,
bulkHeader: bulkHeader,
singleHeader: singleHeader,
}, nil
}