From 013c89e58dab2e5844e414eebe067c0dc23c4ebc Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 25 Jul 2024 12:47:31 +0900 Subject: [PATCH] =?UTF-8?q?watch=20err=20=EC=B2=98=EB=A6=AC=20-=20modifyCh?= =?UTF-8?q?angeStreams=20has=20not=20been=20run=20for=20this=20collection/?= =?UTF-8?q?database/cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mongo.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/mongo.go b/mongo.go index 2a016c9..31a0b15 100644 --- a/mongo.go +++ b/mongo.go @@ -154,10 +154,36 @@ func (mc *MongoClient) DropIndex(coll CollectionName, name string) error { } func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) { + // mc.db.RunCommand() if len(opts) == 0 { opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)} } - return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + + stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + if err != nil { + if mongoErr, ok := err.(mongo.CommandError); ok { + logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code) + if mongoErr.Code == 40573 { + adminDb := mc.db.Client().Database("admin") + result := adminDb.RunCommand(mc.ctx, bson.D{ + {Key: "modifyChangeStreams", Value: 1}, + {Key: "database", Value: mc.db.Name()}, + {Key: "collection", Value: coll}, + {Key: "enable", Value: true}, + }) + + if result.Err() != nil { + logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll) + } else { + return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + } + } + } + + logger.Fatal(err) + } + + return stream, err } func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {