Compare commits

43 Commits

Author SHA1 Message Date
bb3a7fc957 Merge pull request 'Authorization에 Create Time 추가' (#2) from ds into master
Reviewed-on: #2
2025-08-19 00:22:54 +09:00
d00aaae839 Authorization에 Create Time 추가 2025-08-19 00:20:31 +09:00
40baa86bd6 os.Expand시 없는 변수는 그대로 리턴 2025-08-17 14:46:43 +09:00
c24d387761 session consumer query함수 리턴 값의 애매함을 제거 2025-08-15 23:55:50 +09:00
c0ab2afcf4 revoke된 세션 처리 추가 2025-08-15 13:23:04 +09:00
d26b3b9295 재접속시 세션 재확인 2025-08-15 12:06:38 +09:00
c449bae5ef makejwt ttl 수정 2025-08-13 22:32:01 +09:00
1f9eb75e41 osg VerifyJWT 추가 2025-08-13 22:06:06 +09:00
77397bd6bc 로그 제거 2025-08-12 17:40:03 +09:00
54cb3e818f MakeJWT 시그니처 수정 2025-08-07 15:01:03 +09:00
38a3da271a defer 빼먹음;;; 2025-08-05 21:48:12 +09:00
fb3f038506 로그 전송을 비동기로 2025-08-05 21:40:37 +09:00
e4e0d49ace logstream indexprefix 비었을 때 처리 2025-07-29 17:56:47 +09:00
b801be6aca string을 objectid로 변환하는 케이스 추가 2025-07-23 12:21:00 +09:00
49f2bd077d 로그 추가 2025-07-23 11:58:12 +09:00
3f2ce41c2a nil 체크 추가 2025-07-02 11:54:05 +09:00
d74ece6596 metric 생성자 수정 - pipe 추가 2025-07-02 11:30:41 +09:00
d77fa2108a metric 관련 코드 수정 2025-07-01 18:50:03 +09:00
219e627539 Revert "metric 제거"
This reverts commit 1885e675b2.
2025-06-27 01:11:54 +09:00
1885e675b2 metric 제거 2025-06-26 14:47:25 +09:00
b6e187a0a7 Merge pull request 'opensearch gocommon으로 이동' (#1) from ds into master
Reviewed-on: #1
2025-06-25 16:59:07 +09:00
5c7e4e4df2 opensearch gocommon으로 이동 2025-06-25 16:47:08 +09:00
1170be24c0 logger에 RecoverAndErrorSmallStack 추가 2025-05-14 16:14:35 +09:00
e53963d51e 로그 파일 설정 제거 2024-08-22 10:53:32 +09:00
f93d789905 로그에 실행 인자 남김 2024-08-22 09:14:48 +09:00
4dfc070891 헤더에 세션키 없으면 실패 2024-08-13 16:30:42 +09:00
d5fa86b378 nosession에도 option 적용 2024-08-11 20:46:51 +09:00
6d8d2e3078 NewWebsocketHandler에 옵션을 받음 2024-08-11 20:44:54 +09:00
5b3ad3a40c no space 2024-08-08 23:46:18 +09:00
9b1d250cd7 server Start 로그 2024-08-08 23:42:29 +09:00
13124b7903 로그 제거 2024-08-08 14:06:53 +09:00
86fac6bbc0 Merge branch 'master' of https://repositories.action2quare.com/ayo/gocommon 2024-08-08 11:42:52 +09:00
70d3b2507c 로그 추가 2024-08-08 11:42:49 +09:00
ca5632031c [오승석] whitelist key 변경
- Email -> Alias
2024-08-06 20:58:38 +09:00
38c5e05d4c 헤더 사이즈 늘림 2024-08-05 16:27:54 +09:00
7928e69c60 default 404 handler 등록 2024-08-01 13:44:30 +09:00
899bae335e ReadStringsFormValue 추가 2024-07-31 10:51:07 +09:00
8e3d6c28f0 metric 중복 등록 에러는 무시 2024-07-29 17:49:47 +09:00
ae98abe61d RegisterHandlers 시그니쳐 변경 2024-07-29 10:15:04 +09:00
013c89e58d watch err 처리 - modifyChangeStreams has not been run for this collection/database/cluster 2024-07-25 12:47:31 +09:00
dd4928c822 Merge branch 'new_conn' 2024-07-24 18:43:52 +09:00
2dafadf949 logprefix flag 추가 2024-07-24 17:49:45 +09:00
f0a97c4701 logprefix 추가 2024-07-24 17:12:54 +09:00
16 changed files with 644 additions and 216 deletions

5
go.mod
View File

@ -1,6 +1,8 @@
module repositories.action2quare.com/ayo/gocommon
go 1.20
go 1.22
toolchain go1.22.4
require (
github.com/awa/go-iap v1.32.0
@ -34,6 +36,7 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/opensearch-project/opensearch-go/v4 v4.5.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect

2
go.sum
View File

@ -90,6 +90,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/opensearch-project/opensearch-go/v4 v4.5.0 h1:26XckmmF6MhlXt91Bu1yY6R51jy1Ns/C3XgIfvyeTRo=
github.com/opensearch-project/opensearch-go/v4 v4.5.0/go.mod h1:VmFc7dqOEM3ZtLhrpleOzeq+cqUgNabqQG5gX0xId64=
github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs=
github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

View File

@ -2,43 +2,40 @@ package logger
import (
"fmt"
"io"
"log"
"os"
"path"
"runtime"
"runtime/debug"
"strconv"
"strings"
"repositories.action2quare.com/ayo/gocommon/flagx"
)
var stdlogger *log.Logger
var UseLogFile = flagx.Bool("logfile", false, "")
var _ = flagx.Int("logprefix", 3, "0 : no_prefix, 1 : date, 2 : time, 3 : datetime")
func init() {
binpath, _ := os.Executable()
binname := path.Base(strings.ReplaceAll(binpath, "\\", "/"))
outWriter := os.Stdout
var outWriter io.Writer
outWriter = os.Stdout
if *UseLogFile {
ext := path.Ext(binname)
if len(ext) > 0 {
binname = binname[:len(binname)-len(ext)]
args := os.Args
logprefix := 3
for _, arg := range args {
if strings.HasPrefix(arg, "-logprefix=") {
logprefix, _ = strconv.Atoi(arg[11:])
break
}
logFile, err := os.Create(fmt.Sprintf("%s.log", binname))
if err != nil {
os.Stdout.Write([]byte(err.Error()))
panic(err)
}
outWriter = io.MultiWriter(outWriter, logFile)
}
stdlogger = log.New(outWriter, "", log.LstdFlags)
pid := fmt.Sprintf("[%d]", os.Getpid())
outWriter.Write([]byte(strings.Join(append([]string{pid, binpath}, args...), " ")))
if logprefix < 4 {
stdlogger = log.New(outWriter, "", logprefix)
} else {
stdlogger = log.New(outWriter, "", log.LstdFlags)
}
}
func Println(v ...interface{}) {
@ -130,3 +127,33 @@ func ErrorWithCallStack(err error) error {
frames: frames,
}
}
func ErrorSmallStack() {
buf := make([]byte, 1024)
n := runtime.Stack(buf, false)
if n < len(buf) {
buf = buf[:n]
}
Error(string(buf))
}
func RecoverAndErrorSmallStack(r any) any {
if r != nil {
pc := make([]uintptr, 10)
runtime.Callers(1, pc)
curframes := runtime.CallersFrames(pc)
var out []string
for {
frame, more := curframes.Next()
out = append(out, fmt.Sprintf("%s\n\t%s:%d", frame.Function, frame.File, frame.Line))
if !more {
break
}
}
Error(strings.Join(out, "\n"))
}
return r
}

View File

@ -2,13 +2,12 @@ package metric
import (
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"path"
"runtime"
"sort"
"strings"
"sync/atomic"
@ -16,8 +15,6 @@ import (
"repositories.action2quare.com/ayo/gocommon/logger"
)
const metric_value_line_size = 19
type MetricDescription struct {
Key string
Type MetricType
@ -29,7 +26,35 @@ type MetricDescription struct {
type Exporter interface {
RegisterMetric(*MetricDescription)
UpdateMetric(string, float64)
Shutdown()
}
type MetricPipe struct {
pipe *os.File
}
func (mp MetricPipe) Close() {
if mp.pipe != nil {
mp.pipe.Close()
mp.pipe = nil
}
}
func (mp MetricPipe) writeLine(line string) {
mp.pipe.WriteString(line + "\n")
}
func NewMetricPipe(pipeName string) MetricPipe {
switch runtime.GOOS {
case "linux":
pipeName = "/tmp/" + pipeName
case "windows":
pipeName = `\\.\pipe\` + pipeName
}
f, _ := os.Open(pipeName)
return MetricPipe{
pipe: f,
}
}
type MetricWriter interface {
@ -45,13 +70,14 @@ func (mw *metric_empty) Add(int64) {}
var MetricWriterNil = MetricWriter(&metric_empty{})
type metric_int64 struct {
key string
valptr *int64
buff [metric_value_line_size]byte
pipe MetricPipe
}
func (mw *metric_int64) printOut() {
binary.LittleEndian.PutUint64(mw.buff[9:], math.Float64bits(float64(atomic.LoadInt64(mw.valptr))))
os.Stdout.Write(mw.buff[:])
loaded := atomic.LoadInt64(mw.valptr)
mw.pipe.writeLine(fmt.Sprintf("%s:%d", mw.key, loaded))
}
func (mw *metric_int64) Set(newval int64) {
@ -64,11 +90,16 @@ func (mw *metric_int64) Add(inc int64) {
mw.printOut()
}
func NewMetric(mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) {
func NewMetric(pipe MetricPipe, mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) {
if !metricEnabled {
return MetricWriterNil
}
if constLabels == nil {
constLabels = map[string]string{}
}
constLabels["pid"] = fmt.Sprintf("%d", os.Getpid())
var disorder []struct {
k string
v string
@ -101,35 +132,17 @@ func NewMetric(mt MetricType, name string, help string, constLabels map[string]s
})
impl := &metric_int64{
key: key,
valptr: new(int64),
pipe: pipe,
}
impl.buff[0] = METRIC_HEAD_INLINE
impl.buff[17] = METRIC_TAIL_INLINE
impl.buff[18] = '\n'
copy(impl.buff[1:], []byte(key))
output := append([]byte{METRIC_HEAD_INLINE}, temp...)
output = append(output, METRIC_TAIL_INLINE, '\n')
os.Stdout.Write(output)
pipe.writeLine(string(temp))
// writer
return impl
}
func ReadMetricValue(line []byte) (string, float64) {
if len(line) < 16 {
return "", 0
}
key := string(line[0:8])
valbits := binary.LittleEndian.Uint64(line[8:])
val := math.Float64frombits(valbits)
return key, val
}
var metricEnabled = false
func init() {

View File

@ -1,7 +1,7 @@
package metric
import (
"context"
"maps"
"math"
"sync/atomic"
@ -34,11 +34,6 @@ func convertValueType(in MetricType) prometheus.ValueType {
return prometheus.UntypedValue
}
type writeRequest struct {
key string
val float64
}
type prometheusMetricDesc struct {
*prometheus.Desc
valueType prometheus.ValueType
@ -46,112 +41,68 @@ type prometheusMetricDesc struct {
key string
}
type prometheusExporter struct {
writerChan chan *writeRequest
registerChan chan *prometheusMetricDesc
namespace string
cancel context.CancelFunc
type PrometheusCollector struct {
namespace string
metrics map[string]*prometheusMetricDesc
registry *prometheus.Registry
}
func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) {
pe.registerChan <- &prometheusMetricDesc{
Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels),
valueType: convertValueType(nm.Type),
valptr: new(uint64),
key: nm.Key,
}
}
func (pe *prometheusExporter) UpdateMetric(key string, val float64) {
pe.writerChan <- &writeRequest{key: key, val: val}
}
func (pe *prometheusExporter) Shutdown() {
if pe.cancel != nil {
pe.cancel()
}
}
type prometheusCollector struct {
metrics map[string]*prometheusMetricDesc
}
func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) {
func (pc *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) {
for _, v := range pc.metrics {
logger.Println("collector describe :", v.Desc.String())
ch <- v.Desc
}
}
func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
func (pc *PrometheusCollector) Collect(ch chan<- prometheus.Metric) {
for _, v := range pc.metrics {
cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr)))
value := atomic.LoadUint64(v.valptr)
cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(value))
if err == nil {
ch <- cm
}
}
}
func (pe *prometheusExporter) loop(ctx context.Context) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
func (pc *PrometheusCollector) RegisterMetric(md *MetricDescription) *PrometheusCollector {
nm := &prometheusMetricDesc{
Desc: prometheus.NewDesc(prometheus.BuildFQName("ou", "", md.Name), md.Help, nil, md.ConstLabels),
valueType: convertValueType(md.Type),
valptr: new(uint64),
key: md.Key,
}
var collector *prometheusCollector
defer func() {
if collector != nil {
prometheus.Unregister(collector)
}
}()
next := NewPrometheusCollector(pc.namespace, pc.registry)
maps.Copy(next.metrics, pc.metrics)
next.metrics[nm.key] = nm
for {
select {
case <-ctx.Done():
return
pc.registry.Unregister(pc)
pc.registry.Register(next)
case req := <-pe.writerChan:
if collector != nil {
if m := collector.metrics[req.key]; m != nil {
atomic.StoreUint64(m.valptr, math.Float64bits(req.val))
}
}
return next
}
case nm := <-pe.registerChan:
var nextmetrics map[string]*prometheusMetricDesc
if collector != nil {
nextmetrics = collector.metrics
prometheus.Unregister(collector)
nextmetrics[nm.key] = nm
} else {
nextmetrics = map[string]*prometheusMetricDesc{
nm.key: nm,
}
}
nextcollector := &prometheusCollector{
metrics: nextmetrics,
}
if err := prometheus.Register(nextcollector); err != nil {
logger.Error("prometheus register err :", *nm, err)
} else {
collector = nextcollector
}
}
func (pc *PrometheusCollector) UpdateMetric(key string, val float64) {
if m := pc.metrics[key]; m != nil {
atomic.StoreUint64(m.valptr, math.Float64bits(val))
}
}
func NewPrometheusExport(namespace string) Exporter {
ctx, cancel := context.WithCancel(context.Background())
exp := &prometheusExporter{
registerChan: make(chan *prometheusMetricDesc, 10),
writerChan: make(chan *writeRequest, 100),
namespace: namespace,
cancel: cancel,
}
func (pc *PrometheusCollector) UnregisterMetric(key string) *PrometheusCollector {
next := NewPrometheusCollector(pc.namespace, pc.registry)
maps.Copy(next.metrics, pc.metrics)
delete(next.metrics, key)
go exp.loop(ctx)
return exp
pc.registry.Unregister(pc)
pc.registry.Register(next)
return next
}
func NewPrometheusCollector(namespace string, registry *prometheus.Registry) *PrometheusCollector {
return &PrometheusCollector{
namespace: namespace,
metrics: make(map[string]*prometheusMetricDesc),
registry: registry,
}
}

View File

@ -154,10 +154,36 @@ func (mc *MongoClient) DropIndex(coll CollectionName, name string) error {
}
func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
// mc.db.RunCommand()
if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
}
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
if err != nil {
if mongoErr, ok := err.(mongo.CommandError); ok {
logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code)
if mongoErr.Code == 40573 {
adminDb := mc.db.Client().Database("admin")
result := adminDb.RunCommand(mc.ctx, bson.D{
{Key: "modifyChangeStreams", Value: 1},
{Key: "database", Value: mc.db.Name()},
{Key: "collection", Value: coll},
{Key: "enable", Value: true},
})
if result.Err() != nil {
logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll)
} else {
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
}
}
}
logger.Fatal(err)
}
return stream, err
}
func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {

243
opensearch/client.go Normal file
View File

@ -0,0 +1,243 @@
package opensearch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"slices"
"strings"
"sync/atomic"
"time"
osg "github.com/opensearch-project/opensearch-go/v4"
osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type Config struct {
osg.Config `json:",inline"`
IndexPrefix string `json:"IndexPrefix"`
SigningKey string `json:"SigningKey"`
}
type Client struct {
*osg.Client
cfg Config
signingKey []byte
indexTemplatePattern string
bulkHeader http.Header
singleHeader http.Header
sendingCount int32
}
type LogDocument struct {
Type string `json:"type"`
Body any `json:"body"`
Timestamp string `json:"@timestamp"`
Country string `json:"country"`
Ip string `json:"ip"`
Uid string `json:"uid"`
Auth struct {
Type string `json:"type"`
Id string `json:"id"`
} `json:"auth"`
}
func NewLogDocument(logType string, body any) *LogDocument {
return &LogDocument{
Type: strings.ToLower(logType),
Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"),
Body: body,
}
}
func (c *Client) Send(ld *LogDocument) {
if c.Client == nil {
return
}
serialized, _ := json.Marshal(ld)
go func(serialized []byte) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(string(serialized))
return
}
reader := bytes.NewBuffer(serialized)
req := osapi.IndexReq{
Index: c.indexTemplatePattern + ld.Type,
Body: reader,
Header: c.singleHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send failed :", err)
return
}
resp.Body.Close()
}(serialized)
}
func (c *Client) SendBulk(ds map[string]*LogDocument) {
if c == nil {
return
}
var contents string
for _, d := range ds {
b, _ := json.Marshal(d)
contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b))
}
go func(contents string) {
sending := atomic.AddInt32(&c.sendingCount, 1)
defer atomic.AddInt32(&c.sendingCount, -1)
if sending > 100 {
logger.Println("sending log bottleneck :", sending)
logger.Println(contents)
return
}
reader := bytes.NewBuffer([]byte(contents))
req := osapi.BulkReq{
Body: reader,
Header: c.bulkHeader,
}
resp, err := c.Do(context.Background(), req, nil)
if err != nil {
logger.Println("log send bulk failed :", err)
return
}
resp.Body.Close()
}(contents)
}
var jwtHeader string
var encoding = base64.RawURLEncoding
func init() {
src := []byte(`{"alg": "HS256","typ": "JWT"}`)
dst := make([]byte, len(src)*2)
encoding.Encode(dst, src)
enclen := encoding.EncodedLen(len(src))
jwtHeader = string(dst[:enclen])
}
func (c *Client) MakeJWT(subject string, role string, ttl time.Duration) string {
if len(c.signingKey) == 0 {
return ""
}
now := time.Now().Add(ttl).Unix()
src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role))
payload := make([]byte, encoding.EncodedLen(len(src)))
encoding.Encode(payload, src)
encoded := jwtHeader + "." + string(payload)
mac := hmac.New(sha256.New, c.signingKey)
mac.Write([]byte(encoded))
signature := mac.Sum(nil)
sigenc := make([]byte, encoding.EncodedLen(len(signature)))
encoding.Encode(sigenc, signature)
return encoded + "." + string(sigenc)
}
func (c *Client) VerifyJWT(token string) (subject string, role string) {
dot := strings.LastIndex(token, ".")
if dot < 0 {
return
}
encoded := token[:dot]
sigenc := token[dot+1:]
signature := make([]byte, encoding.DecodedLen(len(sigenc)))
encoding.Decode(signature, []byte(sigenc))
mac := hmac.New(sha256.New, c.signingKey)
mac.Write([]byte(encoded))
calsig := mac.Sum(nil)
if slices.Compare(calsig, signature) != 0 {
return
}
_, payload, ok := strings.Cut(encoded, ".")
if !ok {
return
}
srcjson, err := encoding.DecodeString(payload)
if err != nil {
return
}
var src struct {
Exp int64 `json:"exp"`
Sub string `json:"sub"`
Roles string `json:"roles"`
}
if json.Unmarshal([]byte(srcjson), &src) != nil {
return
}
if src.Exp < time.Now().Unix() {
return
}
return src.Sub, src.Roles
}
func NewClient(cfg Config) (Client, error) {
if len(cfg.Addresses) == 0 {
return Client{}, nil
}
client, err := osg.NewClient(cfg.Config)
if err != nil {
return Client{}, err
}
var signingKey []byte
if len(cfg.SigningKey) > 0 {
dst := make([]byte, len(cfg.SigningKey)*2)
dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey))
signingKey = dst[:dstlen]
}
indexPrefix := cfg.IndexPrefix
if !strings.HasSuffix(indexPrefix, "-") && len(indexPrefix) > 0 {
indexPrefix += "-"
}
if !strings.HasSuffix(indexPrefix, "ds-logs-") {
indexPrefix = "ds-logs-" + indexPrefix
}
authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password))))
bulkHeader := make(http.Header)
bulkHeader.Set("Authorization", authHeader)
singleHeader := make(http.Header)
singleHeader.Set("Authorization", authHeader)
return Client{
Client: client,
cfg: cfg,
signingKey: signingKey,
indexTemplatePattern: indexPrefix,
bulkHeader: bulkHeader,
singleHeader: singleHeader,
}, nil
}

78
opensearch/client_test.go Normal file
View File

@ -0,0 +1,78 @@
package opensearch
import (
"encoding/base64"
"testing"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func TestNewClient(t *testing.T) {
// var cfg Config
// cfg.Addresses = []string{"http://localhost:9200"}
// client, err := NewClient(cfg)
// if err != nil {
// t.Errorf("NewClient() error = %v", err)
// return
// }
// for i := 0; i < 10; i++ {
// MakeActorLog("LOGIN", "test_user", "stalkername").Write(&client, bson.M{
// "country": "kr",
// "ip": "127.0.0.1",
// })
// time.Sleep(time.Second)
// }
// for i := 0; i < 10; i++ {
// MakeActorLog("Match", "test_user", "stalkername").Write(&client, bson.M{
// "server": "kr001",
// "mode": "pvp",
// "address": "server address",
// })
// time.Sleep(time.Second)
// }
}
func TestClient_MakeJWT(t *testing.T) {
sk := "UGdiOTdLVjFBTWtndTRNRiZmVjdwMDdCRW1lSSUxTnA="
dst := make([]byte, len(sk)*2)
dstlen, _ := base64.StdEncoding.Decode(dst, []byte(sk))
signingKey := dst[:dstlen]
uid := primitive.NewObjectID().Hex()
type args struct {
subject string
role string
ttl time.Duration
}
tests := []struct {
name string
c *Client
args args
want string
}{
// TODO: Add test cases.
{
c: &Client{
signingKey: signingKey,
},
args: args{
subject: uid,
role: "ds_client",
ttl: time.Minute,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.c.MakeJWT(tt.args.subject, tt.args.role, tt.args.ttl)
subj, role := tt.c.VerifyJWT(got)
if subj != tt.args.subject || role != tt.args.role {
t.Errorf("Client.MakeJWT() = %v, %v, want %v, %v", subj, role, tt.args.subject, tt.args.role)
}
})
}
}

View File

@ -116,7 +116,13 @@ func LoadConfig[T any](outptr *T) error {
}
}
return json.Unmarshal([]byte(os.ExpandEnv(string(configContents))), outptr)
return json.Unmarshal([]byte(os.Expand(string(configContents), func(in string) string {
envval := os.Getenv(in)
if len(envval) == 0 {
return "$" + in
}
return envval
})), outptr)
}
type StorageAddr struct {

View File

@ -14,6 +14,7 @@ import (
"net/url"
"os"
"os/signal"
"path"
"reflect"
"runtime"
"strconv"
@ -40,6 +41,13 @@ func init() {
gob.Register([]any{})
}
type ServerMuxInterface interface {
http.Handler
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
Handle(pattern string, handler http.Handler)
}
const (
// HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다.
HTTPStatusReloginRequired = 599
@ -139,8 +147,19 @@ func isTlsEnabled(fileout ...*string) bool {
return true
}
func registUnhandledPattern(serveMux ServerMuxInterface) {
defer func() {
recover()
}()
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
logger.Println("page not found :", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
})
}
// NewHTTPServer :
func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
func NewHTTPServerWithPort(serveMux ServerMuxInterface, port int) *Server {
if isTlsEnabled() && port == 80 {
port = 443
}
@ -148,16 +167,21 @@ func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_check"), healthCheckHandler)
registUnhandledPattern(serveMux)
server := &Server{
httpserver: &http.Server{Addr: addr, Handler: serveMux},
httpserver: &http.Server{
Addr: addr,
Handler: serveMux,
MaxHeaderBytes: 2 << 20, // 2 MB
},
}
server.httpserver.SetKeepAlivesEnabled(true)
return server
}
func NewHTTPServer(serveMux *http.ServeMux) *Server {
func NewHTTPServer(serveMux ServerMuxInterface) *Server {
// 시작시 자동으로 enable됨
if isTlsEnabled() && *portptr == 80 {
@ -206,7 +230,12 @@ func (server *Server) Stop() {
}
// Start :
func (server *Server) Start() error {
func (server *Server) Start(name ...string) error {
if len(name) == 0 {
exepath, _ := os.Executable()
name = []string{path.Base(exepath)}
}
if server.httpserver != nil {
ln, r := net.Listen("tcp", server.httpserver.Addr)
if r != nil {
@ -235,6 +264,7 @@ func (server *Server) Start() error {
err = server.httpserver.ServeTLS(proxyListener, crtfile, keyfile)
} else {
logger.Println("tls disabled")
logger.Println(strings.Join(name, ", "), "started")
err = server.httpserver.Serve(proxyListener)
}
@ -324,6 +354,12 @@ func ConvertInterface(from interface{}, toType reflect.Type) reflect.Value {
case reflect.Bool:
val, _ := strconv.ParseBool(from.(string))
return reflect.ValueOf(val)
case reflect.String:
if toType == reflect.TypeOf(primitive.ObjectID{}) {
objid, _ := primitive.ObjectIDFromHex(from.(string))
return reflect.ValueOf(objid)
}
}
return fromrv.Convert(toType)
@ -481,6 +517,13 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
return strval, len(strval) > 0
}
func ReadStringsFormValue(r url.Values, key string) ([]string, bool) {
if r.Has(key) {
return (map[string][]string)(r)[key], true
}
return nil, false
}
type encoder interface {
Encode(any) error
}

View File

@ -17,9 +17,10 @@ type Authorization struct {
invalidated string
// by authorization provider
Platform string `bson:"p" json:"p"`
Uid string `bson:"u" json:"u"`
Email string `bson:"em" json:"em"`
Platform string `bson:"p" json:"p"`
Uid string `bson:"u" json:"u"`
Alias string `bson:"al" json:"al"`
CreatedTime primitive.DateTime `bson:"ct" json:"ct"`
}
func (auth *Authorization) ToStrings() []string {
@ -27,13 +28,13 @@ func (auth *Authorization) ToStrings() []string {
"a", auth.Account.Hex(),
"p", auth.Platform,
"u", auth.Uid,
"em", auth.Email,
"al", auth.Alias,
"inv", auth.invalidated,
}
}
func (auth *Authorization) Invalidated() bool {
return len(auth.invalidated) > 0
func (auth *Authorization) Valid() bool {
return len(auth.invalidated) == 0 && !auth.Account.IsZero()
}
func MakeAuthrizationFromStringMap(src map[string]string) Authorization {
@ -42,7 +43,7 @@ func MakeAuthrizationFromStringMap(src map[string]string) Authorization {
Account: accid,
Platform: src["p"],
Uid: src["u"],
Email: src["em"],
Alias: src["al"],
invalidated: src["inv"],
}
}
@ -55,7 +56,7 @@ type Provider interface {
}
type Consumer interface {
Query(string) (Authorization, error)
Query(string) Authorization
Touch(string) (Authorization, error)
IsRevoked(primitive.ObjectID) bool
Revoke(string)
@ -74,10 +75,6 @@ func make_storagekey(acc primitive.ObjectID) storagekey {
return storagekey(acc.Hex() + hex.EncodeToString(bs[2:]))
}
func AccountToSessionKey(acc primitive.ObjectID) string {
return string(make_storagekey(acc))
}
func storagekey_to_publickey(sk storagekey) publickey {
bs, _ := hex.DecodeString(string(sk))

View File

@ -263,25 +263,25 @@ func (c *consumer_mongo) query_internal(sk storagekey) (*sessionMongo, bool, err
return nil, false, nil
}
func (c *consumer_mongo) Query(pk string) (Authorization, error) {
func (c *consumer_mongo) Query(pk string) Authorization {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
si, _, err := c.query_internal(sk)
if err != nil {
return Authorization{}, err
return Authorization{}
}
if si == nil {
return Authorization{}, nil
return Authorization{}
}
if time.Now().After(si.Ts.Time().Add(c.ttl)) {
return Authorization{}, nil
return Authorization{}
}
return *si.Auth, nil
return *si.Auth
}
func (c *consumer_mongo) Touch(pk string) (Authorization, error) {

View File

@ -2,6 +2,7 @@ package session
import (
"context"
"errors"
"fmt"
"time"
@ -243,37 +244,49 @@ func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, error) {
expireAt: time.Now().Add(ttl),
}
if auth.Invalidated() {
c.stages[0].deleted[sk] = si
} else {
if auth.Valid() {
c.add_internal(sk, si)
} else {
c.stages[0].deleted[sk] = si
}
return si, nil
}
func (c *consumer_redis) Query(pk string) (Authorization, error) {
var errRevoked = errors.New("session revoked")
var errExpired = errors.New("session expired")
func (c *consumer_redis) Query(pk string) Authorization {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
if _, deleted := c.stages[0].deleted[sk]; deleted {
return Authorization{}
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return Authorization{}
}
si, err := c.query_internal(sk)
if err != nil {
logger.Println("session consumer query :", pk, err)
return Authorization{}, err
return Authorization{}
}
if si == nil {
logger.Println("session consumer query(si nil) :", pk, nil)
return Authorization{}, nil
return Authorization{}
}
if time.Now().After(si.expireAt) {
logger.Println("session consumer query(expired):", pk, nil)
return Authorization{}, nil
return Authorization{}
}
return *si.Authorization, nil
return *si.Authorization
}
func (c *consumer_redis) Touch(pk string) (Authorization, error) {

View File

@ -60,11 +60,11 @@ func TestExpTable(t *testing.T) {
go func() {
for {
q1, err := cs.Query(sk1)
logger.Println("query :", q1, err)
q1 := cs.Query(sk1)
logger.Println("query :", q1)
q2, err := cs.Query(sk2)
logger.Println("query :", q2, err)
q2 := cs.Query(sk2)
logger.Println("query :", q2)
time.Sleep(time.Second)
}
}()
@ -87,7 +87,7 @@ func TestExpTable(t *testing.T) {
t.Error(err)
}
q2, err := cs2.Query(sk2)
logger.Println("queryf :", q2, err)
q2 := cs2.Query(sk2)
logger.Println("queryf :", q2)
time.Sleep(20 * time.Second)
}

View File

@ -188,6 +188,7 @@ type websocketHandlerBase struct {
type WebsocketHandler struct {
WebsocketApiBroker
websocketHandlerBase
opt *WebsocketHandlerOption
}
type wsConfig struct {
@ -205,7 +206,28 @@ func init() {
gob.Register([]any{})
}
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
type WebsocketHandlerOption struct {
ReadBufferSize, WriteBufferSize int
}
func Option() *WebsocketHandlerOption {
return &WebsocketHandlerOption{
ReadBufferSize: 0,
WriteBufferSize: 0,
}
}
func (opt *WebsocketHandlerOption) SetReadBufferSize(size int) *WebsocketHandlerOption {
opt.ReadBufferSize = size
return opt
}
func (opt *WebsocketHandlerOption) SetWriteBufferSize(size int) *WebsocketHandlerOption {
opt.WriteBufferSize = size
return opt
}
func NewWebsocketHandler(consumer session.Consumer, redisUrl string, opts ...*WebsocketHandlerOption) (*WebsocketHandler, error) {
var config wsConfig
if err := gocommon.LoadConfig(&config); err != nil {
return nil, err
@ -240,6 +262,14 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
}
}()
var opt *WebsocketHandlerOption
if len(opts) > 0 {
// TODO : opts merge
opt = opts[0]
} else {
opt = Option()
}
ws := &WebsocketHandler{
websocketHandlerBase: websocketHandlerBase{
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
@ -252,6 +282,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
sendMsgChan: sendchan,
sessionConsumer: consumer,
},
opt: opt,
}
consumer.RegisterOnSessionInvalidated(ws.onSessionInvalidated)
return ws, nil
@ -266,7 +297,7 @@ func (ws *WebsocketHandler) Cleanup() {
ws.connWaitGroup.Wait()
}
func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
func (ws *WebsocketHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws")
if *noAuthFlag {
serveMux.HandleFunc(url, ws.upgrade_nosession)
@ -333,14 +364,15 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
buffer := bytes.NewBuffer([]byte(raw.Payload))
dec := gob.NewDecoder(buffer)
if raw.Channel == ws.redisMsgChanName {
switch raw.Channel {
case ws.redisMsgChanName:
var msg UpstreamMessage
if err := dec.Decode(&msg); err == nil {
ws.deliveryChan <- &msg
} else {
logger.Println("decode UpstreamMessage failed :", err)
}
} else if raw.Channel == ws.redisCmdChanName {
case ws.redisCmdChanName:
var cmd commandMessage
if err := dec.Decode(&cmd); err == nil {
ws.deliveryChan <- &cmd
@ -556,6 +588,8 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
if c.Conn == nil {
delete(entireConns, c.sender.Accid.Hex())
go ws.ClientDisconnected(c)
} else if ws.sessionConsumer.IsRevoked(c.sender.Accid) {
c.Conn.MakeWriter().WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
} else {
entireConns[c.sender.Accid.Hex()] = c
go ws.ClientConnected(c)
@ -649,9 +683,9 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
accid := primitive.ObjectID(*raw)
sk := r.Header.Get("AS-X-SESSION")
authinfo, err := ws.sessionConsumer.Query(sk)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
authinfo := ws.sessionConsumer.Query(sk)
if !authinfo.Valid() {
w.WriteHeader(http.StatusUnauthorized)
return
}
@ -660,12 +694,10 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
return
}
if authinfo.Invalidated() {
w.WriteHeader(http.StatusUnauthorized)
return
}
var upgrader = websocket.Upgrader{} // use default options
var upgrader = websocket.Upgrader{
ReadBufferSize: ws.opt.ReadBufferSize,
WriteBufferSize: ws.opt.WriteBufferSize,
} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -695,24 +727,21 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
}()
sk := r.Header.Get("AS-X-SESSION")
authinfo, err := ws.sessionConsumer.Query(sk)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("authorize query failed :", err)
return
}
if authinfo.Account.IsZero() {
if len(sk) == 0 {
w.WriteHeader(http.StatusUnauthorized)
return
}
if authinfo.Invalidated() {
authinfo := ws.sessionConsumer.Query(sk)
if !authinfo.Valid() {
w.WriteHeader(http.StatusUnauthorized)
return
}
var upgrader = websocket.Upgrader{} // use default options
var upgrader = websocket.Upgrader{
ReadBufferSize: ws.opt.ReadBufferSize,
WriteBufferSize: ws.opt.WriteBufferSize,
} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)

View File

@ -11,6 +11,7 @@ import (
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session"
@ -18,7 +19,7 @@ import (
)
type WebsocketPeerHandler interface {
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error
}
type peerCtorChannelValue struct {
@ -164,7 +165,7 @@ func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, creator
return wsh
}
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux gocommon.ServerMuxInterface, prefix string) error {
if *noAuthFlag {
serveMux.HandleFunc(prefix, ws.upgrade_noauth)
} else {
@ -304,10 +305,12 @@ func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http
sk := r.Header.Get("AS-X-SESSION")
var accid primitive.ObjectID
if len(sk) > 0 {
authinfo, err := ws.sessionConsumer.Query(sk)
if err == nil {
accid = authinfo.Account
authinfo := ws.sessionConsumer.Query(sk)
if !authinfo.Valid() {
w.WriteHeader(http.StatusUnauthorized)
return
}
accid = authinfo.Account
}
if accid.IsZero() {
@ -362,14 +365,8 @@ func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Reques
}()
sk := r.Header.Get("AS-X-SESSION")
authinfo, err := ws.sessionConsumer.Query(sk)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("authorize query failed :", err)
return
}
if authinfo.Account.IsZero() || authinfo.Invalidated() {
authinfo := ws.sessionConsumer.Query(sk)
if !authinfo.Valid() {
w.WriteHeader(http.StatusUnauthorized)
return
}