package core import ( "bytes" "context" "encoding/hex" "encoding/json" "net/http" "os" "path" "sync/atomic" "time" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type authPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Authinfo *common.Authinfo `bson:"fullDocument"` } type servicePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Service *serviceDescription `bson:"fullDocument"` } type filePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` File *fileDocumentDesc `bson:"fullDocument"` } type whilelistPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Member *whitelistmember `bson:"fullDocument"` } func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "update", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionWhitelist, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchWhitelistCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data whilelistPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": // 새 화이트리스트 멤버 if svc := mg.services.get(data.Member.Service); svc != nil { svc.wl.add(data.Member) } case "update": if svc := mg.services.get(data.Member.Service); svc != nil { if data.Member.Expired != 0 { logger.Println("whitelist member is removed :", *data.Member) svc.wl.remove(data.Member.Email) } else { logger.Println("whitelist member is updated :", *data.Member) svc.wl.add(data.Member) } } } } else { logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } else { time.Sleep(time.Second) } } } func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "documentKey", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchFileCollection watch failed :", err) time.Sleep(time.Second) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if !changed { if stream.Err() != nil || stream.ID() == 0 { logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } else { time.Sleep(time.Second) } continue } var data filePipelineDocument if err := stream.Decode(&data); err == nil { switch data.OperationType { case "insert": data.File.save() case "delete": subfolder := hex.EncodeToString(data.DocumentKey.Id[:4]) rf := hex.EncodeToString(data.DocumentKey.Id[4:8]) subpath := path.Join("static", subfolder, rf) os.RemoveAll(subpath) } } } } func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", "replace", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchServiceCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data servicePipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": // 새 서비스 추가됨 if err := data.Service.prepare(mg); err != nil { logger.Error("service cannot be prepared :", data.Service, err) } else { logger.Println("service is on the board! :", data.Service) mg.services.add(data.Service) serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service) } case "replace": fallthrough case "update": data.Service.prepare(mg) if old := mg.services.get(data.Service.ServiceName); old != nil { atomic.SwapPointer(&old.divisionsForUsersSerialized, data.Service.divisionsForUsersSerialized) atomic.SwapPointer(&old.divisionsSerialized, data.Service.divisionsSerialized) atomic.SwapPointer(&old.apiUsers, data.Service.apiUsers) atomic.SwapPointer(&old.serviceSerialized, data.Service.serviceSerialized) atomic.SwapPointer(&old.serviceSummarySerialized, data.Service.serviceSummarySerialized) for _, token := range old.ServerApiTokens { mg.apiTokenToService.remove(token.Hex()) } for _, token := range data.Service.ServerApiTokens { mg.apiTokenToService.add(token.Hex(), data.Service.ServiceCode) } if data.Service.UseWhitelist { atomic.StoreInt32(&old.wl.working, 1) } else { atomic.StoreInt32(&old.wl.working, 0) } old.Closed = data.Service.Closed if old.Closed { atomic.StoreInt32(&old.closed, 1) } else { atomic.StoreInt32(&old.closed, 0) } atomic.SwapPointer(&old.wl.emailptr, data.Service.wl.emailptr) old.Divisions = data.Service.Divisions for _, div := range old.Divisions { var req *http.Request if div.State == DivisionState_FullOpen { req, _ = http.NewRequest("POST", div.Url+"/maingate", nil) } else if div.Maintenance != nil { bt, _ := json.Marshal(div.Maintenance) req, _ = http.NewRequest("POST", div.Url+"/maingate", bytes.NewBuffer(bt)) } if req != nil { // MG-X-API-TOKEN req.Header.Add("MG-X-API-TOKEN", old.ServerApiTokens[0].Hex()) if resp, err := http.DefaultClient.Do(req); err == nil { resp.Body.Close() } } } } else if !data.Service.Closed { logger.Println("service is on the board! :", data.Service) mg.services.add(data.Service) serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service) } case "delete": if deleted := mg.services.remove(data.DocumentKey.Id); deleted != nil { logger.Println("service is closed :", data.Service) atomic.AddInt32(&deleted.closed, 1) } } } else { logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } else { time.Sleep(time.Second) } } } func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, mongoClient common.MongoClient) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchAuthCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data authPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": ac.AddRaw(&mongoAuthCell{src: data.Authinfo}) case "update": ac.AddRaw(&mongoAuthCell{src: data.Authinfo}) case "delete": ac.RemoveByAccId(data.DocumentKey.Id) } } else { logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { logger.Error("watchAuthCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } else { time.Sleep(time.Second) } } }