2023-07-10 09:42:34 +09:00
|
|
|
package rpc
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
2023-07-10 11:43:52 +09:00
|
|
|
"crypto/md5"
|
2023-07-10 09:42:34 +09:00
|
|
|
"encoding/gob"
|
2023-07-10 11:43:52 +09:00
|
|
|
"encoding/hex"
|
2023-07-10 09:42:34 +09:00
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
2023-07-10 10:53:49 +09:00
|
|
|
"path"
|
2023-07-10 09:42:34 +09:00
|
|
|
"reflect"
|
|
|
|
|
"runtime"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
|
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Receiver interface {
|
|
|
|
|
TargetExists(primitive.ObjectID) bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type receiverManifest struct {
|
|
|
|
|
r Receiver
|
|
|
|
|
methods map[string]reflect.Method
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type rpcEngine struct {
|
|
|
|
|
receivers map[string]receiverManifest
|
|
|
|
|
publish func([]byte) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var engine = rpcEngine{
|
|
|
|
|
receivers: make(map[string]receiverManifest),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func RegistReceiver(ptr Receiver) {
|
|
|
|
|
rname := reflect.TypeOf(ptr).Elem().Name()
|
|
|
|
|
rname = fmt.Sprintf("(*%s)", rname)
|
|
|
|
|
|
|
|
|
|
methods := make(map[string]reflect.Method)
|
|
|
|
|
for i := 0; i < reflect.TypeOf(ptr).NumMethod(); i++ {
|
|
|
|
|
method := reflect.TypeOf(ptr).Method(i)
|
|
|
|
|
methods[method.Name] = method
|
|
|
|
|
}
|
|
|
|
|
engine.receivers[rname] = receiverManifest{
|
|
|
|
|
r: ptr,
|
|
|
|
|
methods: methods,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Start(ctx context.Context, redisClient *redis.Client) {
|
|
|
|
|
if engine.publish != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-10 11:43:52 +09:00
|
|
|
hash := md5.New()
|
2023-07-10 12:13:51 +09:00
|
|
|
for k, manifest := range engine.receivers {
|
2023-07-10 11:43:52 +09:00
|
|
|
hash.Write([]byte(k))
|
2023-07-10 12:13:51 +09:00
|
|
|
for m, r := range manifest.methods {
|
|
|
|
|
hash.Write([]byte(m))
|
|
|
|
|
hash.Write([]byte(r.Name))
|
|
|
|
|
for i := 0; i < r.Type.NumIn(); i++ {
|
|
|
|
|
inName := r.Type.In(i).Name()
|
|
|
|
|
hash.Write([]byte(inName))
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-07-10 11:43:52 +09:00
|
|
|
}
|
2023-07-10 12:13:51 +09:00
|
|
|
|
2023-07-10 11:43:52 +09:00
|
|
|
pubsubName := hex.EncodeToString(hash.Sum(nil))[:16]
|
|
|
|
|
|
2023-07-10 09:42:34 +09:00
|
|
|
engine.publish = func(s []byte) error {
|
|
|
|
|
_, err := redisClient.Publish(ctx, pubsubName, s).Result()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go engine.loop(ctx, redisClient, pubsubName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (re *rpcEngine) callFromMessage(msg *redis.Message) {
|
|
|
|
|
defer func() {
|
|
|
|
|
r := recover()
|
|
|
|
|
if r != nil {
|
|
|
|
|
logger.Error(r)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
encoded := []byte(msg.Payload)
|
|
|
|
|
var target primitive.ObjectID
|
|
|
|
|
copy(target[:], encoded[:12])
|
|
|
|
|
|
|
|
|
|
encoded = encoded[12:]
|
|
|
|
|
for i, c := range encoded {
|
|
|
|
|
if c == ')' {
|
|
|
|
|
if manifest, ok := re.receivers[string(encoded[:i+1])]; ok {
|
|
|
|
|
// 리시버 찾음
|
|
|
|
|
if manifest.r.TargetExists(target) {
|
|
|
|
|
// 이 리시버가 타겟을 가지고 있음
|
|
|
|
|
encoded = encoded[i+1:]
|
|
|
|
|
decoder := gob.NewDecoder(bytes.NewBuffer(encoded))
|
|
|
|
|
var params []any
|
|
|
|
|
if decoder.Decode(¶ms) == nil {
|
|
|
|
|
method := manifest.methods[params[0].(string)]
|
|
|
|
|
args := []reflect.Value{
|
|
|
|
|
reflect.ValueOf(manifest.r),
|
|
|
|
|
}
|
|
|
|
|
for _, arg := range params[1:] {
|
|
|
|
|
args = append(args, reflect.ValueOf(arg))
|
|
|
|
|
}
|
|
|
|
|
method.Func.Call(args)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanName string) {
|
|
|
|
|
defer func() {
|
|
|
|
|
r := recover()
|
|
|
|
|
if r != nil {
|
|
|
|
|
logger.Error(r)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
pubsub := redisClient.Subscribe(ctx, chanName)
|
|
|
|
|
for {
|
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if pubsub == nil {
|
|
|
|
|
pubsub = redisClient.Subscribe(ctx, chanName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msg, err := pubsub.ReceiveMessage(ctx)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == redis.ErrClosed {
|
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
}
|
|
|
|
|
pubsub = nil
|
|
|
|
|
} else {
|
|
|
|
|
re.callFromMessage(msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var errNoReceiver = errors.New("no receiver")
|
|
|
|
|
|
2023-07-10 10:29:16 +09:00
|
|
|
type callContext struct {
|
2023-07-10 09:42:34 +09:00
|
|
|
r Receiver
|
|
|
|
|
t primitive.ObjectID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var ErrCanExecuteHere = errors.New("go ahead")
|
|
|
|
|
|
2023-07-10 10:29:16 +09:00
|
|
|
func (c callContext) call(args ...any) error {
|
2023-07-10 09:42:34 +09:00
|
|
|
if c.r.TargetExists(c.t) {
|
|
|
|
|
// 여기 있네?
|
|
|
|
|
return ErrCanExecuteHere
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pc := make([]uintptr, 1)
|
2023-07-10 10:53:49 +09:00
|
|
|
n := runtime.Callers(3, pc[:])
|
2023-07-10 09:42:34 +09:00
|
|
|
if n < 1 {
|
|
|
|
|
return errNoReceiver
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
frame, _ := runtime.CallersFrames(pc).Next()
|
2023-07-10 11:12:45 +09:00
|
|
|
fullname := path.Base(frame.Function)
|
|
|
|
|
prf := strings.Split(fullname, ".")
|
|
|
|
|
rname := prf[1]
|
|
|
|
|
funcname := prf[2]
|
2023-07-10 09:42:34 +09:00
|
|
|
|
|
|
|
|
serialized, err := encode(c.t, rname, funcname, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return engine.publish(serialized)
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-10 10:29:16 +09:00
|
|
|
func CallOrGo(r Receiver, target primitive.ObjectID, args ...any) error {
|
|
|
|
|
cc := callContext{
|
|
|
|
|
r: r,
|
|
|
|
|
t: target,
|
|
|
|
|
}
|
|
|
|
|
return cc.call(args...)
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-10 09:42:34 +09:00
|
|
|
func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) {
|
|
|
|
|
buff := new(bytes.Buffer)
|
|
|
|
|
|
|
|
|
|
// 타겟을 가장 먼저 기록
|
|
|
|
|
buff.Write(target[:])
|
|
|
|
|
|
|
|
|
|
// receiver
|
|
|
|
|
buff.Write([]byte(receiver))
|
|
|
|
|
|
|
|
|
|
// 다음 call context 기록
|
|
|
|
|
m := append([]any{funcname}, args...)
|
|
|
|
|
encoder := gob.NewEncoder(buff)
|
|
|
|
|
err := encoder.Encode(m)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("rpcCallContext.send err :", err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return buff.Bytes(), nil
|
|
|
|
|
}
|