안쓰는 코드 정리 및 bson을 json으로 통일. 추후 gob으로 변경 예정
This commit is contained in:
@ -18,8 +18,6 @@ import (
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
var linkupdate = flagx.String("updatelink", "", "")
|
||||
@ -239,17 +237,3 @@ func ReplyUpdateComplete() {
|
||||
|
||||
// return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
|
||||
// }
|
||||
|
||||
type BsonMarshaler[T any] struct {
|
||||
val T
|
||||
}
|
||||
|
||||
func NewBsonMarshaler[T any](val T) *BsonMarshaler[T] {
|
||||
return &BsonMarshaler[T]{
|
||||
val: val,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BsonMarshaler[T]) MarshalBinary() (data []byte, err error) {
|
||||
return bson.Marshal(m.val)
|
||||
}
|
||||
|
||||
218
rpc/rpc.go
218
rpc/rpc.go
@ -1,218 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/gob"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
type Receiver interface {
|
||||
TargetExists(primitive.ObjectID) bool
|
||||
}
|
||||
|
||||
type receiverManifest struct {
|
||||
r Receiver
|
||||
methods map[string]reflect.Method
|
||||
}
|
||||
|
||||
type rpcEngine struct {
|
||||
receivers map[string]receiverManifest
|
||||
publish func([]byte) error
|
||||
}
|
||||
|
||||
var engine = rpcEngine{
|
||||
receivers: make(map[string]receiverManifest),
|
||||
}
|
||||
|
||||
func RegistReceiver(ptr Receiver) {
|
||||
rname := reflect.TypeOf(ptr).Elem().Name()
|
||||
rname = fmt.Sprintf("(*%s)", rname)
|
||||
|
||||
methods := make(map[string]reflect.Method)
|
||||
for i := 0; i < reflect.TypeOf(ptr).NumMethod(); i++ {
|
||||
method := reflect.TypeOf(ptr).Method(i)
|
||||
methods[method.Name] = method
|
||||
}
|
||||
engine.receivers[rname] = receiverManifest{
|
||||
r: ptr,
|
||||
methods: methods,
|
||||
}
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, redisClient *redis.Client) {
|
||||
if engine.publish != nil {
|
||||
return
|
||||
}
|
||||
|
||||
hash := md5.New()
|
||||
for k, manifest := range engine.receivers {
|
||||
hash.Write([]byte(k))
|
||||
for m, r := range manifest.methods {
|
||||
hash.Write([]byte(m))
|
||||
hash.Write([]byte(r.Name))
|
||||
for i := 0; i < r.Type.NumIn(); i++ {
|
||||
inName := r.Type.In(i).Name()
|
||||
hash.Write([]byte(inName))
|
||||
}
|
||||
}
|
||||
hash.Write([]byte(fmt.Sprintf("%d", redisClient.Options().DB)))
|
||||
}
|
||||
|
||||
pubsubName := hex.EncodeToString(hash.Sum(nil))[:16]
|
||||
|
||||
engine.publish = func(s []byte) error {
|
||||
_, err := redisClient.Publish(ctx, pubsubName, s).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
go engine.loop(ctx, redisClient, pubsubName)
|
||||
}
|
||||
|
||||
func (re *rpcEngine) callFromMessage(msg *redis.Message) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r != nil {
|
||||
logger.Error(r)
|
||||
}
|
||||
}()
|
||||
|
||||
encoded := []byte(msg.Payload)
|
||||
var target primitive.ObjectID
|
||||
copy(target[:], encoded[:12])
|
||||
|
||||
encoded = encoded[12:]
|
||||
for i, c := range encoded {
|
||||
if c == ')' {
|
||||
if manifest, ok := re.receivers[string(encoded[:i+1])]; ok {
|
||||
// 리시버 찾음
|
||||
if manifest.r.TargetExists(target) {
|
||||
// 이 리시버가 타겟을 가지고 있음
|
||||
encoded = encoded[i+1:]
|
||||
decoder := gob.NewDecoder(bytes.NewBuffer(encoded))
|
||||
var params []any
|
||||
if decoder.Decode(¶ms) == nil {
|
||||
method := manifest.methods[params[0].(string)]
|
||||
args := []reflect.Value{
|
||||
reflect.ValueOf(manifest.r),
|
||||
}
|
||||
for _, arg := range params[1:] {
|
||||
args = append(args, reflect.ValueOf(arg))
|
||||
}
|
||||
method.Func.Call(args)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanName string) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r != nil {
|
||||
logger.Error(r)
|
||||
}
|
||||
}()
|
||||
|
||||
pubsub := redisClient.Subscribe(ctx, chanName)
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if pubsub == nil {
|
||||
pubsub = redisClient.Subscribe(ctx, chanName)
|
||||
}
|
||||
|
||||
msg, err := pubsub.ReceiveMessage(ctx)
|
||||
|
||||
if err != nil {
|
||||
if err == redis.ErrClosed {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
pubsub = nil
|
||||
} else {
|
||||
re.callFromMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var errNoReceiver = errors.New("no receiver")
|
||||
|
||||
type CallContext struct {
|
||||
r Receiver
|
||||
t primitive.ObjectID
|
||||
}
|
||||
|
||||
var ErrCanExecuteHere = errors.New("go ahead")
|
||||
|
||||
func (c *CallContext) Call(args ...any) error {
|
||||
if c.r.TargetExists(c.t) {
|
||||
// 여기 있네?
|
||||
return ErrCanExecuteHere
|
||||
}
|
||||
|
||||
pc := make([]uintptr, 1)
|
||||
n := runtime.Callers(3, pc[:])
|
||||
if n < 1 {
|
||||
return errNoReceiver
|
||||
}
|
||||
|
||||
frame, _ := runtime.CallersFrames(pc).Next()
|
||||
fullname := path.Base(frame.Function)
|
||||
prf := strings.Split(fullname, ".")
|
||||
rname := prf[1]
|
||||
funcname := prf[2]
|
||||
|
||||
serialized, err := encode(c.t, rname, funcname, args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return engine.publish(serialized)
|
||||
}
|
||||
|
||||
func Make(r Receiver) *CallContext {
|
||||
return &CallContext{
|
||||
r: r,
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *CallContext) To(target primitive.ObjectID) *CallContext {
|
||||
cc.t = target
|
||||
return cc
|
||||
}
|
||||
|
||||
func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) {
|
||||
buff := new(bytes.Buffer)
|
||||
|
||||
// 타겟을 가장 먼저 기록
|
||||
buff.Write(target[:])
|
||||
|
||||
// receiver
|
||||
buff.Write([]byte(receiver))
|
||||
|
||||
// 다음 call context 기록
|
||||
m := append([]any{funcname}, args...)
|
||||
encoder := gob.NewEncoder(buff)
|
||||
err := encoder.Encode(m)
|
||||
if err != nil {
|
||||
logger.Error("rpcCallContext.send err :", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
@ -1,48 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
type testReceiver struct {
|
||||
}
|
||||
|
||||
func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool {
|
||||
logger.Println(tid.Hex())
|
||||
return tid[0] >= 10
|
||||
}
|
||||
|
||||
func (tr *testReceiver) TestFunc(a string, b string, c int) {
|
||||
target := primitive.NewObjectID()
|
||||
target[0] = byte(rand.Intn(2) * 20)
|
||||
if Make(tr).To(target).Call(a, b, c) != ErrCanExecuteHere {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Println(" ", a, b, target[0])
|
||||
}
|
||||
|
||||
func TestRpc(t *testing.T) {
|
||||
var tr testReceiver
|
||||
RegistReceiver(&tr)
|
||||
myctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379")
|
||||
go func() {
|
||||
for {
|
||||
tr.TestFunc("aaaa", "bbbb", 333)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
Start(myctx, redisClient)
|
||||
<-myctx.Done()
|
||||
cancel()
|
||||
}
|
||||
11
server.go
11
server.go
@ -2,6 +2,7 @@ package gocommon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -26,7 +27,6 @@ import (
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"github.com/pires/go-proxyproto"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
@ -430,12 +430,17 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
|
||||
return strval, len(strval) > 0
|
||||
}
|
||||
|
||||
func ReadBsonDocumentFromBody[T any](r io.Reader, out *T) error {
|
||||
func DecodeGob[T any](r io.Reader, out *T) error {
|
||||
dec := gob.NewDecoder(r)
|
||||
return dec.Decode(out)
|
||||
}
|
||||
|
||||
func ReadJsonDocumentFromBody[T any](r io.Reader, out *T) error {
|
||||
bt, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bson.Unmarshal(bt, out)
|
||||
return json.Unmarshal(bt, out)
|
||||
}
|
||||
|
||||
func DotStringToTimestamp(tv string) primitive.Timestamp {
|
||||
|
||||
@ -2,11 +2,11 @@ package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
@ -43,7 +43,7 @@ func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio
|
||||
}
|
||||
|
||||
func (p *provider_redis) New(input *Authorization) (string, error) {
|
||||
bt, err := bson.Marshal(input)
|
||||
bt, err := json.Marshal(input)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -82,7 +82,7 @@ func (p *provider_redis) Query(pk string) (Authorization, error) {
|
||||
}
|
||||
|
||||
var auth Authorization
|
||||
if err := bson.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
@ -197,7 +197,7 @@ func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, bool, err
|
||||
}
|
||||
|
||||
var auth Authorization
|
||||
if err := bson.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user