package core import ( "bytes" "context" "encoding/hex" "encoding/json" "net/http" "os" "path" "sync/atomic" "time" "unsafe" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type authPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Authinfo *common.Authinfo `bson:"fullDocument"` } type servicePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Service *serviceDescription `bson:"fullDocument"` } type filePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` File *FileDocumentDesc `bson:"fullDocument"` } type whilelistPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Member *whitelistmember `bson:"fullDocument"` } func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "update", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionWhitelist, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchWhitelistCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchWhitelistCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data whilelistPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": // 새 화이트리스트 멤버 mg.service().wl.add(data.Member) case "update": if data.Member.Expired != 0 { logger.Println("whitelist member is removed :", *data.Member) mg.service().wl.remove(data.Member.Email) } else { logger.Println("whitelist member is updated :", *data.Member) mg.service().wl.add(data.Member) } } } else { logger.Error("watchWhitelistCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchWhitelistCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchWhitelistCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } } } func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "documentKey", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchFileCollection watch failed :", err) time.Sleep(time.Second) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if !changed { if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchServiceCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } continue } var data filePipelineDocument if err := stream.Decode(&data); err == nil { switch data.OperationType { case "insert": data.File.Save() case "delete": subfolder := hex.EncodeToString(data.DocumentKey.Id[:4]) rf := hex.EncodeToString(data.DocumentKey.Id[4:8]) subpath := path.Join("static", subfolder, rf) os.RemoveAll(subpath) } } } } func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "insert", "update", "replace", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchServiceCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data servicePipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": // 새 서비스 추가됨 if err := data.Service.prepare(mg); err != nil { logger.Error("service cannot be prepared :", data.Service, err) } else { // 내가 임시로 가지고 있던 서비스일 수 있다. already := mg.service().Id == data.Service.Id logger.Println("service is on the board! :", data.Service) atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service)) if !already { serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), mg.service()) } } case "replace": fallthrough case "update": data.Service.prepare(mg) old := mg.service() atomic.SwapPointer(&old.divisionsForUsersSerialized, data.Service.divisionsForUsersSerialized) atomic.SwapPointer(&old.divisionsSerialized, data.Service.divisionsSerialized) atomic.SwapPointer(&old.admins, data.Service.admins) atomic.SwapPointer(&old.serviceSerialized, data.Service.serviceSerialized) atomic.SwapPointer(&old.serviceSummarySerialized, data.Service.serviceSummarySerialized) atomic.SwapPointer(&old.wl.emailptr, data.Service.wl.emailptr) old.Divisions = data.Service.Divisions for _, div := range old.Divisions { var req *http.Request if div.State == DivisionState_FullOpen { req, _ = http.NewRequest("POST", div.Url+"/maingate", nil) } else if div.Maintenance != nil { bt, _ := json.Marshal(div.Maintenance) req, _ = http.NewRequest("POST", div.Url+"/maingate", bytes.NewBuffer(bt)) } if req != nil { // MG-X-API-TOKEN req.Header.Add("MG-X-API-TOKEN", old.ServerApiTokens[0].Hex()) if resp, err := http.DefaultClient.Do(req); err == nil { resp.Body.Close() } } } } } else { logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchServiceCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } } } func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, mongoClient common.MongoClient) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchAuthCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data authPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": ac.AddRaw(&mongoAuthCell{src: data.Authinfo}) case "update": ac.AddRaw(&mongoAuthCell{src: data.Authinfo}) case "delete": ac.RemoveByAccId(data.DocumentKey.Id) } } else { logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchAuthCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchAuthCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } } }