watch err 처리 - modifyChangeStreams has not been run for this collection/database/cluster

This commit is contained in:
2024-07-25 12:47:31 +09:00
parent dd4928c822
commit 013c89e58d

View File

@ -154,10 +154,36 @@ func (mc *MongoClient) DropIndex(coll CollectionName, name string) error {
} }
func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) { func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
// mc.db.RunCommand()
if len(opts) == 0 { if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)} opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
} }
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
if err != nil {
if mongoErr, ok := err.(mongo.CommandError); ok {
logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code)
if mongoErr.Code == 40573 {
adminDb := mc.db.Client().Database("admin")
result := adminDb.RunCommand(mc.ctx, bson.D{
{Key: "modifyChangeStreams", Value: 1},
{Key: "database", Value: mc.db.Name()},
{Key: "collection", Value: coll},
{Key: "enable", Value: true},
})
if result.Err() != nil {
logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll)
} else {
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
}
}
}
logger.Fatal(err)
}
return stream, err
} }
func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection { func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {