package core import ( "context" "sync/atomic" "time" "unsafe" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" ) type memberContraints[K comparable] interface { Key() K Expired() bool } type memberContainerPtr[K comparable, T memberContraints[K]] struct { ptr unsafe.Pointer } func (p *memberContainerPtr[K, T]) init(ms []T) { next := map[K]T{} for _, m := range ms { next[m.Key()] = m } atomic.StorePointer(&p.ptr, unsafe.Pointer(&next)) } func (p *memberContainerPtr[K, T]) add(m T) { ptr := atomic.LoadPointer(&p.ptr) src := (*map[K]T)(ptr) next := map[K]T{} for k, v := range *src { next[k] = v } next[m.Key()] = m atomic.StorePointer(&p.ptr, unsafe.Pointer(&next)) } func (p *memberContainerPtr[K, T]) get(key K) (T, bool) { ptr := atomic.LoadPointer(&p.ptr) src := (*map[K]T)(ptr) out, found := (*src)[key] return out, found } func (p *memberContainerPtr[K, T]) remove(key K) { ptr := atomic.LoadPointer(&p.ptr) src := (*map[K]T)(ptr) next := map[K]T{} for k, v := range *src { next[k] = v } delete(next, key) atomic.StorePointer(&p.ptr, unsafe.Pointer(&next)) } type memberPipelineDocument[K comparable, T memberContraints[K]] struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Member T `bson:"fullDocument"` } func (p *memberContainerPtr[K, T]) all() []T { ptr := atomic.LoadPointer(&p.ptr) src := (*map[K]T)(ptr) out := make([]T, 0, len(*src)) for _, m := range *src { if m.Expired() { continue } out = append(out, m) } return out } func (p *memberContainerPtr[K, T]) watchCollection(parentctx context.Context, coll gocommon.CollectionName, mc gocommon.MongoClient) { defer func() { s := recover() if s != nil { logger.Error(s) } }() matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "update", "insert", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream var err error var ctx context.Context for { if stream == nil { stream, err = mc.Watch(coll, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchCollection watch failed :", err) time.Sleep(time.Minute) continue } ctx = context.TODO() } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data memberPipelineDocument[K, T] if err := stream.Decode(&data); err == nil { p.add(data.Member) } else { logger.Error("watchCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } } }