package core import ( "context" "encoding/hex" "os" "path" "sync/atomic" "time" "unsafe" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type servicePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Service *serviceDescription `bson:"fullDocument"` } type filePipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` File *FileDocumentDesc `bson:"fullDocument"` } func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux gocommon.ServerMuxInterface, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "documentKey", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchFileCollection watch failed :", err) time.Sleep(time.Second) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if !changed { if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchServiceCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } continue } var data filePipelineDocument if err := stream.Decode(&data); err == nil { switch data.OperationType { case "insert": data.File.Save() case "delete": subfolder := hex.EncodeToString(data.DocumentKey.Id[:4]) rf := hex.EncodeToString(data.DocumentKey.Id[4:8]) subpath := path.Join("static", subfolder, rf) os.RemoveAll(subpath) } } } } func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux gocommon.ServerMuxInterface, prefix string) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "insert", "update", "replace", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) if err != nil { logger.Error("watchServiceCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data servicePipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": // 새 서비스 추가됨 if err := data.Service.prepare(mg); err != nil { logger.Error("service cannot be prepared :", data.Service, err) } else { // 내가 임시로 가지고 있던 서비스일 수 있다. if mg.service().Id == data.Service.Id { logger.Println("service is on the board! :", data.Service) atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service)) } } case "replace": fallthrough case "update": data.Service.prepare(mg) atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service)) } } else { logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchServiceCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchServiceCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } } }