package gocommon import ( "context" "encoding/json" "errors" "os" "time" "repositories.action2quare.com/ayo/gocommon/logger" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" ) type MongoClient struct { db *mongo.Database c *mongo.Client ctx context.Context } type ConnectionInfo struct { Url string Database string } type CollectionName string func ParseObjectID(hexstr string) (out primitive.ObjectID) { out, _ = primitive.ObjectIDFromHex(hexstr) return } func NewMongoConnectionInfo(url string, dbname string) *ConnectionInfo { if len(dbname) == 0 { panic("dbname is empty") } if *devflag { hostname, _ := os.Hostname() dbname = hostname + "-" + dbname } return &ConnectionInfo{ Url: url, Database: dbname, } } func (ci *ConnectionInfo) SetURL(url string) *ConnectionInfo { ci.Url = url return ci } func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo { ci.Database = dbname return ci } var errNoDatabaseNameInMongoUri = errors.New("mongo uri has no database name") func NewMongoClient(ctx context.Context, url string) (MongoClient, error) { connstr, err := connstring.ParseAndValidate(url) if err != nil { return MongoClient{}, err } if len(connstr.Database) == 0 { return MongoClient{}, errNoDatabaseNameInMongoUri } return newMongoClient(ctx, NewMongoConnectionInfo(url, connstr.Database)) } func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) { if len(ci.Url) == 0 { return MongoClient{}, errors.New("mongo connection string is empty") } secondaryPref := readpref.SecondaryPreferred() //client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref)) client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref).SetServerMonitor(&event.ServerMonitor{ ServerOpening: func(evt *event.ServerOpeningEvent) { logger.Println("mongodb ServerOpening :", *evt) }, ServerClosed: func(evt *event.ServerClosedEvent) { logger.Println("mongodb ServerClosed :", *evt) }, TopologyOpening: func(evt *event.TopologyOpeningEvent) { logger.Println("mongodb TopologyOpening :", *evt) }, TopologyClosed: func(evt *event.TopologyClosedEvent) { logger.Println("mongodb TopologyClosed :", *evt) }, })) if err != nil { return MongoClient{}, err } err = client.Connect(ctx) if err != nil { return MongoClient{}, err } // go func() { // for { // if err := client.Ping(ctx, nil); err != nil { // logger.Error("mongo client ping err :", err) // } // select { // case <-time.After(10 * time.Second): // continue // case <-ctx.Done(): // return // } // } // }() mdb := client.Database(ci.Database, nil) return MongoClient{c: client, db: mdb, ctx: ctx}, nil } func (mc *MongoClient) Connected() bool { return mc.db != nil && mc.c != nil } func (mc *MongoClient) Close() { if mc.c != nil { mc.c.Disconnect(mc.ctx) } } func (mc *MongoClient) Drop() error { return mc.db.Drop(mc.ctx) } func (mc *MongoClient) DropIndex(coll CollectionName, name string) error { matchcoll := mc.Collection(coll) _, err := matchcoll.Indexes().DropOne(mc.ctx, name) if commanderr, ok := err.(mongo.CommandError); ok { if commanderr.Code == 27 { // 인덱스가 없는 것이므로 그냥 성공 return nil } } return err } func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) { // mc.db.RunCommand() if len(opts) == 0 { opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)} } stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) if err != nil { if mongoErr, ok := err.(mongo.CommandError); ok { logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code) if mongoErr.Code == 40573 { adminDb := mc.db.Client().Database("admin") result := adminDb.RunCommand(mc.ctx, bson.D{ {Key: "modifyChangeStreams", Value: 1}, {Key: "database", Value: mc.db.Name()}, {Key: "collection", Value: coll}, {Key: "enable", Value: true}, }) if result.Err() != nil { logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll) } else { return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) } } } logger.Fatal(err) } return stream, err } func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection { return mc.db.Collection(string(collname)) } func (mc *MongoClient) AllAs(coll CollectionName, output any, opts ...*options.FindOptions) error { cursor, err := mc.Collection(coll).Find(mc.ctx, bson.D{}, opts...) if err != nil { return err } defer cursor.Close(mc.ctx) err = cursor.All(mc.ctx, output) if err != nil { return err } return nil } func (mc *MongoClient) All(coll CollectionName, opts ...*options.FindOptions) ([]bson.M, error) { var all []bson.M err := mc.AllAs(coll, &all, opts...) return all, err } func (mc *MongoClient) FindOneAndDelete(coll CollectionName, filter bson.M, opts ...*options.FindOneAndDeleteOptions) (bson.M, error) { result := mc.Collection(coll).FindOneAndDelete(mc.ctx, filter, opts...) err := result.Err() if err != nil { if err == mongo.ErrNoDocuments { return nil, nil } return nil, err } tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err != nil { return nil, err } return bson.M(tmp), nil } func (mc *MongoClient) Delete(coll CollectionName, filter bson.M, opts ...*options.DeleteOptions) (bool, error) { r, err := mc.Collection(coll).DeleteOne(mc.ctx, filter, opts...) if err != nil { return false, err } return r.DeletedCount > 0, nil } func (mc *MongoClient) UnsetField(coll CollectionName, filter bson.M, doc bson.M) error { _, err := mc.Collection(coll).UpdateOne(mc.ctx, filter, bson.M{ "$unset": doc, }) return err } func (mc *MongoClient) DeleteMany(coll CollectionName, filters bson.D, opts ...*options.DeleteOptions) (int, error) { if len(filters) == 0 { // 큰일난다 return 0, nil } result, err := mc.Collection(coll).DeleteMany(mc.ctx, filters, opts...) if err != nil { return 0, err } return int(result.DeletedCount), nil } type CommandInsertMany[T any] struct { MongoClient Collection CollectionName Documents []T } func (c *CommandInsertMany[T]) Exec(opts ...*options.InsertManyOptions) (int, error) { conv := make([]any, len(c.Documents)) for i, v := range c.Documents { conv[i] = v } return c.InsertMany(c.Collection, conv, opts...) } func (mc *MongoClient) InsertMany(coll CollectionName, documents []interface{}, opts ...*options.InsertManyOptions) (int, error) { result, err := mc.Collection(coll).InsertMany(mc.ctx, documents, opts...) if err != nil { return 0, err } return len(result.InsertedIDs), nil } func (mc *MongoClient) UpdateMany(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (count int, err error) { result, e := mc.Collection(coll).UpdateMany(mc.ctx, filter, doc, opts...) if e != nil { return 0, e } err = nil count = int(result.UpsertedCount + result.ModifiedCount) return } type Marshaler interface { MarshalBSON() ([]byte, error) } type JsonDefaultMashaller struct { doc *bson.M } func (m *JsonDefaultMashaller) MarshalBSON() ([]byte, error) { return json.Marshal(m.doc) } func (mc *MongoClient) Update(coll CollectionName, filter bson.M, doc interface{}, opts ...*options.UpdateOptions) (worked bool, newid interface{}, err error) { result, e := mc.Collection(coll).UpdateOne(mc.ctx, filter, doc, opts...) if e != nil { return false, "", e } err = nil worked = result.MatchedCount > 0 || result.UpsertedCount > 0 || result.ModifiedCount > 0 newid = result.UpsertedID return } func (mc *MongoClient) UpsertOne(coll CollectionName, filter bson.M, doc interface{}) (worked bool, newid interface{}, err error) { return mc.Update(coll, filter, bson.M{ "$set": doc, }, options.Update().SetUpsert(true)) // return mc.Update(coll, filter, &JsonDefaultMashaller{doc: &bson.M{ // "$set": doc, // }}, options.Update().SetUpsert(true)) } func (mc *MongoClient) FindOneAs(coll CollectionName, filter bson.M, out interface{}, opts ...*options.FindOneOptions) error { err := mc.Collection(coll).FindOne(mc.ctx, filter, opts...).Decode(out) if err == mongo.ErrNoDocuments { err = nil } return err } func (mc *MongoClient) FindOne(coll CollectionName, filter bson.M, opts ...*options.FindOneOptions) (doc bson.M, err error) { result := mc.Collection(coll).FindOne(mc.ctx, filter, opts...) tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err == nil { doc = bson.M(tmp) } else if err == mongo.ErrNoDocuments { err = nil } return } func (mc *MongoClient) FindOneAndUpdateAs(coll CollectionName, filter bson.M, doc bson.M, out interface{}, opts ...*options.FindOneAndUpdateOptions) error { result := mc.Collection(coll).FindOneAndUpdate(mc.ctx, filter, doc, opts...) err := result.Decode(out) if err == nil { return nil } if err == mongo.ErrNoDocuments { return nil } return err } func (mc *MongoClient) FindOneAndUpdate(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.FindOneAndUpdateOptions) (olddoc bson.M, err error) { result := mc.Collection(coll).FindOneAndUpdate(mc.ctx, filter, doc, opts...) tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err == nil { olddoc = bson.M(tmp) } else if err == mongo.ErrNoDocuments { err = nil } return } func (mc *MongoClient) Exists(coll CollectionName, filter bson.M) (bool, error) { cnt, err := mc.Collection(coll).CountDocuments(mc.ctx, filter, options.Count().SetLimit(1)) if err != nil { return false, err } return cnt > 0, nil } func (mc *MongoClient) SearchText(coll CollectionName, text string, opts ...*options.FindOptions) ([]bson.M, error) { cursor, err := mc.Collection(coll).Find(mc.ctx, bson.M{"$text": bson.M{"$search": text}}, opts...) if err != nil { return nil, err } defer cursor.Close(mc.ctx) var output []bson.M err = cursor.All(mc.ctx, &output) if err != nil { return nil, err } return output, nil } func (mc *MongoClient) FindAll(coll CollectionName, filter bson.M, opts ...*options.FindOptions) ([]bson.M, error) { cursor, err := mc.Collection(coll).Find(mc.ctx, filter, opts...) if err != nil { return nil, err } defer cursor.Close(mc.ctx) var output []bson.M err = cursor.All(mc.ctx, &output) if err != nil { return nil, err } return output, nil } func (mc *MongoClient) FindAllAs(coll CollectionName, filter bson.M, output interface{}, opts ...*options.FindOptions) error { cursor, err := mc.Collection(coll).Find(mc.ctx, filter, opts...) if err != nil { return err } defer cursor.Close(mc.ctx) err = cursor.All(mc.ctx, output) if err != nil { return err } return nil } func (mc *MongoClient) MakeExpireIndex(coll CollectionName, expireSeconds int32) error { matchcoll := mc.Collection(coll) indices, err := matchcoll.Indexes().List(mc.ctx, options.ListIndexes().SetMaxTime(time.Second)) if err != nil { return err } allindices := make([]interface{}, 0) err = indices.All(mc.ctx, &allindices) if err != nil { return err } tsfound := false var tsname string var exp int32 IndexSearchLabel: for _, index := range allindices { d := index.(bson.D) key := d.Map()["key"].(bson.D) for _, kd := range key { if kd.Key == "_ts" { tsfound = true if v, ok := d.Map()["name"]; ok { tsname = v.(string) } if v, ok := d.Map()["expireAfterSeconds"]; ok { exp = v.(int32) } break IndexSearchLabel } } } if tsfound { if exp == expireSeconds { return nil } _, err = matchcoll.Indexes().DropOne(mc.ctx, tsname) if err != nil { return err } } mod := mongo.IndexModel{ Keys: primitive.M{"_ts": 1}, Options: options.Index().SetExpireAfterSeconds(expireSeconds), } _, err = matchcoll.Indexes().CreateOne(mc.ctx, mod) return err } func (mc *MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { collection := mc.Collection(coll) cursor, err := collection.Indexes().List(mc.ctx, options.ListIndexes().SetMaxTime(time.Second)) if err != nil { return err } defer cursor.Close(mc.ctx) found := make(map[string]bool) for k := range indices { found[k] = false } for cursor.TryNext(mc.ctx) { rawval := cursor.Current name := rawval.Lookup("name").StringValue() if _, ok := indices[name]; ok { found[name] = true } } for name, exist := range found { if !exist { v := indices[name] var mod mongo.IndexModel if len(v) == 1 { mod = mongo.IndexModel{ Keys: primitive.M{v[0].Key: v[0].Value}, Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...), } } else { mod = mongo.IndexModel{ Keys: indices[name], Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...), } } _, err = collection.Indexes().CreateOne(mc.ctx, mod) if err != nil { return err } } } return nil } func (mc *MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { return mc.makeIndicesWithOption(coll, indices, append(opts, options.Index().SetUnique(true))...) } func (mc *MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { return mc.makeIndicesWithOption(coll, indices, opts...) }