package core import ( "context" "errors" "flag" "io" "net" "net/http" "reflect" "strings" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" "repositories.action2quare.com/ayo/tavern/core/rpc" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsonrw" "go.mongodb.org/mongo-driver/bson/primitive" ) const ( defaultMaxMemory = 32 << 10 // 32 KB ) func writeBsonArr(w io.Writer, src []bson.M) error { return writeBsonDoc(w, bson.M{ "r": src, }) } func onlineGroupQueryKey(prefix string) string { return prefix + "_olg" } func writeBsonDoc[T any](w io.Writer, src T) error { rw, err := bsonrw.NewBSONValueWriter(w) if err != nil { return err } enc, err := bson.NewEncoder(rw) if err != nil { return err } return enc.Encode(src) } func readBsonDoc(r io.Reader, src any) error { body, err := io.ReadAll(r) if err != nil { return err } if len(body) == 0 { return nil } decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body)) if err != nil { return err } err = decoder.Decode(src) if err != nil { return err } return nil } type rpcCallDomain[T any] struct { rpcCallChanName string caller rpc.RpcCaller callee rpc.RpcCallee[T] methods map[string]reflect.Method } func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] { var tmp *CalleeType methods := make(map[string]reflect.Method) tp := reflect.TypeOf(tmp) for i := 0; i < tp.NumMethod(); i++ { method := tp.Method(i) methods[method.Name] = method } rpcChanName := "conn_rpc_channel_" + tp.Name() publishFunc := func(bt []byte) error { _, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result() return err } return rpcCallDomain[CalleeType]{ rpcCallChanName: rpcChanName, caller: rpc.NewRpcCaller(publishFunc), callee: rpc.NewRpcCallee(creator), methods: methods, } } type TavernConfig struct { common.RegionStorageConfig `json:",inline"` GroupTypes map[string]*groupConfig `json:"tavern_group_types"` MaingateApiToken string `json:"maingate_api_token"` macAddr string } var config TavernConfig type Tavern struct { subTaverns []*subTavern wsh *wshandler.WebsocketHandler } type subTavern struct { mongoClient common.MongoClient wsh *wshandler.WebsocketHandler region string groups map[string]group methods map[string]reflect.Method wshRpc rpcCallDomain[richConnOuter] } func getMacAddr() (string, error) { ifas, err := net.Interfaces() if err != nil { return "", err } for _, ifa := range ifas { a := ifa.HardwareAddr.String() if a != "" { a = strings.ReplaceAll(a, ":", "") return a, nil } } return "", errors.New("no net interface") } // New : func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) { if !flag.Parsed() { flag.Parse() } if inconfig == nil { var loaded TavernConfig if err := common.LoadConfig(&loaded); err != nil { return nil, err } inconfig = &loaded } config = *inconfig macaddr, err := getMacAddr() if err != nil { return nil, err } config.macAddr = macaddr tv := Tavern{ wsh: wsh, } if err = tv.prepare(context); err != nil { logger.Println("tavern prepare() failed :", err) return nil, err } return &tv, nil } func (tv *Tavern) Destructor() { tv.wsh.Destructor() for _, st := range tv.subTaverns { st.mongoClient.Close() } } type groupPipelineDocument struct { OperationType string `bson:"operationType"` FullDocument map[string]any `bson:"fullDocument"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` UpdateDescription struct { UpdatedFields bson.M `bson:"updatedFields"` RemovedFileds bson.A `bson:"removedFields"` TruncatedArrays bson.A `bson:"truncatedArrays"` } `bson:"updateDescription"` } func (tv *Tavern) prepare(ctx context.Context) error { for region, url := range config.RegionStorage { var dbconn common.MongoClient var err error var groupinstance group if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil { return err } var tmp *subTavern methods := make(map[string]reflect.Method) tp := reflect.TypeOf(tmp) for i := 0; i < tp.NumMethod(); i++ { method := tp.Method(i) methods[method.Name] = method } sub := &subTavern{ wsh: tv.wsh, mongoClient: dbconn, region: region, methods: methods, } sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter { return &richConnOuter{wsh: sub.wsh, rc: rc} }) groups := make(map[string]group) for typename, cfg := range config.GroupTypes { cfg.Name = typename if cfg.Transient { groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh) } else { if !dbconn.Connected() { dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) if err != nil { return err } } groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh) } if err != nil { return err } groups[typename] = groupinstance } sub.groups = groups tv.subTaverns = append(tv.subTaverns, sub) } return nil } func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { // request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern // 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다. // 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?) for _, sub := range tv.subTaverns { var pattern string if sub.region == "default" { pattern = common.MakeHttpHandlerPattern(prefix, "api") } else { pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api") } serveMux.HandleFunc(pattern, sub.api) deliveryChan := tv.wsh.DeliveryChannel(sub.region) go sub.deliveryMessageHandler(deliveryChan) } return nil } func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { defer func() { s := recover() if s != nil { logger.Error(s) } io.Copy(io.Discard, r.Body) r.Body.Close() }() // 서버에서 오는 요청만 처리 apitoken := r.Header.Get("MG-X-API-TOKEN") if apitoken != config.MaingateApiToken { // 서버가 보내는 쿼리만 허용 logger.Println("MG-X-API-TOKEN is missing") w.WriteHeader(http.StatusBadRequest) return } operation := r.URL.Query().Get("operation") if len(operation) == 0 { w.WriteHeader(http.StatusBadRequest) return } method, ok := sub.methods[operation] if !ok { // 없는 operation logger.Println("fail to call api. operation is not valid :", operation) w.WriteHeader(http.StatusBadRequest) return } if r.PostForm == nil { r.ParseMultipartForm(defaultMaxMemory) } args := []reflect.Value{ reflect.ValueOf(sub), reflect.ValueOf(w), reflect.ValueOf(r), } method.Func.Call(args) }