diff --git a/azure/misc.go b/azure/misc.go index f28e24b..28cd833 100644 --- a/azure/misc.go +++ b/azure/misc.go @@ -18,8 +18,6 @@ import ( "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" - - "go.mongodb.org/mongo-driver/bson" ) var linkupdate = flagx.String("updatelink", "", "") @@ -239,17 +237,3 @@ func ReplyUpdateComplete() { // return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24) // } - -type BsonMarshaler[T any] struct { - val T -} - -func NewBsonMarshaler[T any](val T) *BsonMarshaler[T] { - return &BsonMarshaler[T]{ - val: val, - } -} - -func (m *BsonMarshaler[T]) MarshalBinary() (data []byte, err error) { - return bson.Marshal(m.val) -} diff --git a/rpc/rpc.go b/rpc/rpc.go deleted file mode 100644 index 6a62991..0000000 --- a/rpc/rpc.go +++ /dev/null @@ -1,218 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "crypto/md5" - "encoding/gob" - "encoding/hex" - "errors" - "fmt" - "path" - "reflect" - "runtime" - "strings" - "time" - - "github.com/go-redis/redis/v8" - "go.mongodb.org/mongo-driver/bson/primitive" - "repositories.action2quare.com/ayo/gocommon/logger" -) - -type Receiver interface { - TargetExists(primitive.ObjectID) bool -} - -type receiverManifest struct { - r Receiver - methods map[string]reflect.Method -} - -type rpcEngine struct { - receivers map[string]receiverManifest - publish func([]byte) error -} - -var engine = rpcEngine{ - receivers: make(map[string]receiverManifest), -} - -func RegistReceiver(ptr Receiver) { - rname := reflect.TypeOf(ptr).Elem().Name() - rname = fmt.Sprintf("(*%s)", rname) - - methods := make(map[string]reflect.Method) - for i := 0; i < reflect.TypeOf(ptr).NumMethod(); i++ { - method := reflect.TypeOf(ptr).Method(i) - methods[method.Name] = method - } - engine.receivers[rname] = receiverManifest{ - r: ptr, - methods: methods, - } -} - -func Start(ctx context.Context, redisClient *redis.Client) { - if engine.publish != nil { - return - } - - hash := md5.New() - for k, manifest := range engine.receivers { - hash.Write([]byte(k)) - for m, r := range manifest.methods { - hash.Write([]byte(m)) - hash.Write([]byte(r.Name)) - for i := 0; i < r.Type.NumIn(); i++ { - inName := r.Type.In(i).Name() - hash.Write([]byte(inName)) - } - } - hash.Write([]byte(fmt.Sprintf("%d", redisClient.Options().DB))) - } - - pubsubName := hex.EncodeToString(hash.Sum(nil))[:16] - - engine.publish = func(s []byte) error { - _, err := redisClient.Publish(ctx, pubsubName, s).Result() - return err - } - - go engine.loop(ctx, redisClient, pubsubName) -} - -func (re *rpcEngine) callFromMessage(msg *redis.Message) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() - - encoded := []byte(msg.Payload) - var target primitive.ObjectID - copy(target[:], encoded[:12]) - - encoded = encoded[12:] - for i, c := range encoded { - if c == ')' { - if manifest, ok := re.receivers[string(encoded[:i+1])]; ok { - // 리시버 찾음 - if manifest.r.TargetExists(target) { - // 이 리시버가 타겟을 가지고 있음 - encoded = encoded[i+1:] - decoder := gob.NewDecoder(bytes.NewBuffer(encoded)) - var params []any - if decoder.Decode(¶ms) == nil { - method := manifest.methods[params[0].(string)] - args := []reflect.Value{ - reflect.ValueOf(manifest.r), - } - for _, arg := range params[1:] { - args = append(args, reflect.ValueOf(arg)) - } - method.Func.Call(args) - } - } - } - } - } -} - -func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanName string) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() - - pubsub := redisClient.Subscribe(ctx, chanName) - for { - if ctx.Err() != nil { - return - } - - if pubsub == nil { - pubsub = redisClient.Subscribe(ctx, chanName) - } - - msg, err := pubsub.ReceiveMessage(ctx) - - if err != nil { - if err == redis.ErrClosed { - time.Sleep(time.Second) - } - pubsub = nil - } else { - re.callFromMessage(msg) - } - } -} - -var errNoReceiver = errors.New("no receiver") - -type CallContext struct { - r Receiver - t primitive.ObjectID -} - -var ErrCanExecuteHere = errors.New("go ahead") - -func (c *CallContext) Call(args ...any) error { - if c.r.TargetExists(c.t) { - // 여기 있네? - return ErrCanExecuteHere - } - - pc := make([]uintptr, 1) - n := runtime.Callers(3, pc[:]) - if n < 1 { - return errNoReceiver - } - - frame, _ := runtime.CallersFrames(pc).Next() - fullname := path.Base(frame.Function) - prf := strings.Split(fullname, ".") - rname := prf[1] - funcname := prf[2] - - serialized, err := encode(c.t, rname, funcname, args...) - if err != nil { - return err - } - - return engine.publish(serialized) -} - -func Make(r Receiver) *CallContext { - return &CallContext{ - r: r, - } -} - -func (cc *CallContext) To(target primitive.ObjectID) *CallContext { - cc.t = target - return cc -} - -func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) { - buff := new(bytes.Buffer) - - // 타겟을 가장 먼저 기록 - buff.Write(target[:]) - - // receiver - buff.Write([]byte(receiver)) - - // 다음 call context 기록 - m := append([]any{funcname}, args...) - encoder := gob.NewEncoder(buff) - err := encoder.Encode(m) - if err != nil { - logger.Error("rpcCallContext.send err :", err) - return nil, err - } - - return buff.Bytes(), nil -} diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go deleted file mode 100644 index 765354f..0000000 --- a/rpc/rpc_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package rpc - -import ( - "context" - "math/rand" - "testing" - "time" - - "go.mongodb.org/mongo-driver/bson/primitive" - "repositories.action2quare.com/ayo/gocommon" - "repositories.action2quare.com/ayo/gocommon/logger" -) - -type testReceiver struct { -} - -func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool { - logger.Println(tid.Hex()) - return tid[0] >= 10 -} - -func (tr *testReceiver) TestFunc(a string, b string, c int) { - target := primitive.NewObjectID() - target[0] = byte(rand.Intn(2) * 20) - if Make(tr).To(target).Call(a, b, c) != ErrCanExecuteHere { - return - } - - logger.Println(" ", a, b, target[0]) -} - -func TestRpc(t *testing.T) { - var tr testReceiver - RegistReceiver(&tr) - myctx, cancel := context.WithCancel(context.Background()) - - redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379") - go func() { - for { - tr.TestFunc("aaaa", "bbbb", 333) - time.Sleep(time.Second) - } - }() - - Start(myctx, redisClient) - <-myctx.Done() - cancel() -} diff --git a/server.go b/server.go index eeb9b60..d3f98f8 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package gocommon import ( "context" + "encoding/gob" "encoding/json" "errors" "fmt" @@ -26,7 +27,6 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" "github.com/pires/go-proxyproto" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -430,12 +430,17 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) { return strval, len(strval) > 0 } -func ReadBsonDocumentFromBody[T any](r io.Reader, out *T) error { +func DecodeGob[T any](r io.Reader, out *T) error { + dec := gob.NewDecoder(r) + return dec.Decode(out) +} + +func ReadJsonDocumentFromBody[T any](r io.Reader, out *T) error { bt, err := io.ReadAll(r) if err != nil { return err } - return bson.Unmarshal(bt, out) + return json.Unmarshal(bt, out) } func DotStringToTimestamp(tv string) primitive.Timestamp { diff --git a/session/impl_redis.go b/session/impl_redis.go index 3604167..fa5a775 100644 --- a/session/impl_redis.go +++ b/session/impl_redis.go @@ -2,11 +2,11 @@ package session import ( "context" + "encoding/json" "fmt" "time" "github.com/go-redis/redis/v8" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" @@ -43,7 +43,7 @@ func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio } func (p *provider_redis) New(input *Authorization) (string, error) { - bt, err := bson.Marshal(input) + bt, err := json.Marshal(input) if err != nil { return "", err } @@ -82,7 +82,7 @@ func (p *provider_redis) Query(pk string) (Authorization, error) { } var auth Authorization - if err := bson.Unmarshal([]byte(payload), &auth); err != nil { + if err := json.Unmarshal([]byte(payload), &auth); err != nil { return Authorization{}, err } @@ -197,7 +197,7 @@ func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, bool, err } var auth Authorization - if err := bson.Unmarshal([]byte(payload), &auth); err != nil { + if err := json.Unmarshal([]byte(payload), &auth); err != nil { return nil, false, err }