Files
gocommon/mongo.go

543 lines
14 KiB
Go
Raw Permalink Normal View History

package gocommon
2023-05-24 15:10:15 +09:00
import (
"context"
"encoding/json"
"errors"
"os"
"time"
"repositories.action2quare.com/ayo/gocommon/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
2023-09-01 10:42:15 +09:00
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
2023-05-24 15:10:15 +09:00
)
type MongoClient struct {
2024-06-25 10:13:50 +09:00
db *mongo.Database
c *mongo.Client
ctx context.Context
2023-05-24 15:10:15 +09:00
}
type ConnectionInfo struct {
Url string
Database string
}
type CollectionName string
func ParseObjectID(hexstr string) (out primitive.ObjectID) {
out, _ = primitive.ObjectIDFromHex(hexstr)
return
}
func NewMongoConnectionInfo(url string, dbname string) *ConnectionInfo {
if len(dbname) == 0 {
panic("dbname is empty")
}
if *devflag {
2023-05-24 15:10:15 +09:00
hostname, _ := os.Hostname()
dbname = hostname + "-" + dbname
}
return &ConnectionInfo{
Url: url,
Database: dbname,
}
}
func (ci *ConnectionInfo) SetURL(url string) *ConnectionInfo {
ci.Url = url
return ci
}
func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
ci.Database = dbname
return ci
}
2023-09-01 10:42:15 +09:00
var errNoDatabaseNameInMongoUri = errors.New("mongo uri has no database name")
func NewMongoClient(ctx context.Context, url string) (MongoClient, error) {
connstr, err := connstring.ParseAndValidate(url)
if err != nil {
return MongoClient{}, err
}
if len(connstr.Database) == 0 {
return MongoClient{}, errNoDatabaseNameInMongoUri
}
return newMongoClient(ctx, NewMongoConnectionInfo(url, connstr.Database))
2023-05-24 15:10:15 +09:00
}
func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) {
if len(ci.Url) == 0 {
return MongoClient{}, errors.New("mongo connection string is empty")
}
secondaryPref := readpref.SecondaryPreferred()
//client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref))
client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref).SetServerMonitor(&event.ServerMonitor{
ServerOpening: func(evt *event.ServerOpeningEvent) {
logger.Println("mongodb ServerOpening :", *evt)
},
ServerClosed: func(evt *event.ServerClosedEvent) {
logger.Println("mongodb ServerClosed :", *evt)
},
TopologyOpening: func(evt *event.TopologyOpeningEvent) {
logger.Println("mongodb TopologyOpening :", *evt)
},
TopologyClosed: func(evt *event.TopologyClosedEvent) {
logger.Println("mongodb TopologyClosed :", *evt)
},
}))
if err != nil {
return MongoClient{}, err
}
err = client.Connect(ctx)
if err != nil {
return MongoClient{}, err
}
2023-11-22 18:23:41 +09:00
// go func() {
// for {
// if err := client.Ping(ctx, nil); err != nil {
// logger.Error("mongo client ping err :", err)
// }
// select {
// case <-time.After(10 * time.Second):
// continue
// case <-ctx.Done():
// return
// }
// }
// }()
2023-05-24 15:10:15 +09:00
mdb := client.Database(ci.Database, nil)
2024-06-25 10:29:11 +09:00
return MongoClient{c: client, db: mdb, ctx: ctx}, nil
2023-05-24 15:10:15 +09:00
}
func (mc *MongoClient) Connected() bool {
2023-05-24 15:10:15 +09:00
return mc.db != nil && mc.c != nil
}
func (mc *MongoClient) Close() {
2023-05-24 15:10:15 +09:00
if mc.c != nil {
2024-06-25 10:13:50 +09:00
mc.c.Disconnect(mc.ctx)
2023-05-24 15:10:15 +09:00
}
}
func (mc *MongoClient) Drop() error {
2024-06-25 10:13:50 +09:00
return mc.db.Drop(mc.ctx)
2024-06-25 10:11:02 +09:00
}
func (mc *MongoClient) DropIndex(coll CollectionName, name string) error {
2023-08-23 17:40:14 +09:00
matchcoll := mc.Collection(coll)
2024-06-25 10:13:50 +09:00
_, err := matchcoll.Indexes().DropOne(mc.ctx, name)
2023-08-23 17:40:14 +09:00
if commanderr, ok := err.(mongo.CommandError); ok {
if commanderr.Code == 27 {
// 인덱스가 없는 것이므로 그냥 성공
return nil
}
}
return err
}
func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
// mc.db.RunCommand()
2023-05-24 15:10:15 +09:00
if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
}
stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
if err != nil {
if mongoErr, ok := err.(mongo.CommandError); ok {
logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code)
if mongoErr.Code == 40573 {
adminDb := mc.db.Client().Database("admin")
result := adminDb.RunCommand(mc.ctx, bson.D{
{Key: "modifyChangeStreams", Value: 1},
{Key: "database", Value: mc.db.Name()},
{Key: "collection", Value: coll},
{Key: "enable", Value: true},
})
if result.Err() != nil {
logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll)
} else {
return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...)
}
}
}
logger.Fatal(err)
}
return stream, err
2023-05-24 15:10:15 +09:00
}
func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {
2023-05-24 15:10:15 +09:00
return mc.db.Collection(string(collname))
}
func (mc *MongoClient) AllAs(coll CollectionName, output any, opts ...*options.FindOptions) error {
2024-06-25 10:13:50 +09:00
cursor, err := mc.Collection(coll).Find(mc.ctx, bson.D{}, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
2023-06-20 09:59:11 +09:00
return err
2023-05-24 15:10:15 +09:00
}
2024-06-25 10:13:50 +09:00
defer cursor.Close(mc.ctx)
2023-05-24 15:10:15 +09:00
2024-06-25 10:13:50 +09:00
err = cursor.All(mc.ctx, output)
2023-05-24 15:10:15 +09:00
if err != nil {
2023-06-20 09:59:11 +09:00
return err
2023-05-24 15:10:15 +09:00
}
2023-06-20 09:59:11 +09:00
return nil
}
func (mc *MongoClient) All(coll CollectionName, opts ...*options.FindOptions) ([]bson.M, error) {
2023-06-20 09:59:11 +09:00
var all []bson.M
err := mc.AllAs(coll, &all, opts...)
return all, err
2023-05-24 15:10:15 +09:00
}
func (mc *MongoClient) FindOneAndDelete(coll CollectionName, filter bson.M, opts ...*options.FindOneAndDeleteOptions) (bson.M, error) {
2024-06-25 10:13:50 +09:00
result := mc.Collection(coll).FindOneAndDelete(mc.ctx, filter, opts...)
2023-05-24 15:10:15 +09:00
err := result.Err()
if err != nil {
if err == mongo.ErrNoDocuments {
return nil, nil
}
return nil, err
}
tmp := make(map[string]interface{})
err = result.Decode(&tmp)
if err != nil {
return nil, err
}
return bson.M(tmp), nil
}
func (mc *MongoClient) Delete(coll CollectionName, filter bson.M, opts ...*options.DeleteOptions) (bool, error) {
2024-06-25 10:13:50 +09:00
r, err := mc.Collection(coll).DeleteOne(mc.ctx, filter, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return false, err
}
return r.DeletedCount > 0, nil
}
func (mc *MongoClient) UnsetField(coll CollectionName, filter bson.M, doc bson.M) error {
2024-06-25 10:13:50 +09:00
_, err := mc.Collection(coll).UpdateOne(mc.ctx, filter, bson.M{
2023-05-24 15:10:15 +09:00
"$unset": doc,
})
return err
}
func (mc *MongoClient) DeleteMany(coll CollectionName, filters bson.D, opts ...*options.DeleteOptions) (int, error) {
2023-05-24 15:10:15 +09:00
if len(filters) == 0 {
// 큰일난다
return 0, nil
}
2024-06-25 10:13:50 +09:00
result, err := mc.Collection(coll).DeleteMany(mc.ctx, filters, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return 0, err
}
return int(result.DeletedCount), nil
}
type CommandInsertMany[T any] struct {
MongoClient
Collection CollectionName
Documents []T
}
func (c *CommandInsertMany[T]) Exec(opts ...*options.InsertManyOptions) (int, error) {
conv := make([]any, len(c.Documents))
for i, v := range c.Documents {
conv[i] = v
}
return c.InsertMany(c.Collection, conv, opts...)
}
func (mc *MongoClient) InsertMany(coll CollectionName, documents []interface{}, opts ...*options.InsertManyOptions) (int, error) {
2024-06-25 10:13:50 +09:00
result, err := mc.Collection(coll).InsertMany(mc.ctx, documents, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return 0, err
}
return len(result.InsertedIDs), nil
}
func (mc *MongoClient) UpdateMany(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (count int, err error) {
2024-06-25 10:13:50 +09:00
result, e := mc.Collection(coll).UpdateMany(mc.ctx, filter, doc, opts...)
2023-05-24 15:10:15 +09:00
if e != nil {
return 0, e
}
err = nil
count = int(result.UpsertedCount + result.ModifiedCount)
return
}
type Marshaler interface {
MarshalBSON() ([]byte, error)
}
type JsonDefaultMashaller struct {
doc *bson.M
}
func (m *JsonDefaultMashaller) MarshalBSON() ([]byte, error) {
return json.Marshal(m.doc)
}
func (mc *MongoClient) Update(coll CollectionName, filter bson.M, doc interface{}, opts ...*options.UpdateOptions) (worked bool, newid interface{}, err error) {
2024-06-25 10:13:50 +09:00
result, e := mc.Collection(coll).UpdateOne(mc.ctx, filter, doc, opts...)
2023-05-24 15:10:15 +09:00
if e != nil {
return false, "", e
}
err = nil
worked = result.MatchedCount > 0 || result.UpsertedCount > 0 || result.ModifiedCount > 0
newid = result.UpsertedID
return
}
func (mc *MongoClient) UpsertOne(coll CollectionName, filter bson.M, doc interface{}) (worked bool, newid interface{}, err error) {
2023-05-24 15:10:15 +09:00
return mc.Update(coll, filter, bson.M{
"$set": doc,
}, options.Update().SetUpsert(true))
// return mc.Update(coll, filter, &JsonDefaultMashaller{doc: &bson.M{
// "$set": doc,
// }}, options.Update().SetUpsert(true))
}
func (mc *MongoClient) FindOneAs(coll CollectionName, filter bson.M, out interface{}, opts ...*options.FindOneOptions) error {
2024-06-25 10:13:50 +09:00
err := mc.Collection(coll).FindOne(mc.ctx, filter, opts...).Decode(out)
2023-05-24 15:10:15 +09:00
if err == mongo.ErrNoDocuments {
err = nil
}
return err
}
func (mc *MongoClient) FindOne(coll CollectionName, filter bson.M, opts ...*options.FindOneOptions) (doc bson.M, err error) {
2024-06-25 10:13:50 +09:00
result := mc.Collection(coll).FindOne(mc.ctx, filter, opts...)
2023-05-24 15:10:15 +09:00
tmp := make(map[string]interface{})
err = result.Decode(&tmp)
if err == nil {
doc = bson.M(tmp)
} else if err == mongo.ErrNoDocuments {
err = nil
}
return
}
func (mc *MongoClient) FindOneAndUpdateAs(coll CollectionName, filter bson.M, doc bson.M, out interface{}, opts ...*options.FindOneAndUpdateOptions) error {
2024-06-25 10:13:50 +09:00
result := mc.Collection(coll).FindOneAndUpdate(mc.ctx, filter, doc, opts...)
2023-05-24 15:10:15 +09:00
err := result.Decode(out)
if err == nil {
return nil
}
if err == mongo.ErrNoDocuments {
return nil
}
return err
}
func (mc *MongoClient) FindOneAndUpdate(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.FindOneAndUpdateOptions) (olddoc bson.M, err error) {
2024-06-25 10:13:50 +09:00
result := mc.Collection(coll).FindOneAndUpdate(mc.ctx, filter, doc, opts...)
2023-05-24 15:10:15 +09:00
tmp := make(map[string]interface{})
err = result.Decode(&tmp)
if err == nil {
olddoc = bson.M(tmp)
} else if err == mongo.ErrNoDocuments {
err = nil
}
return
}
func (mc *MongoClient) Exists(coll CollectionName, filter bson.M) (bool, error) {
2024-06-25 10:13:50 +09:00
cnt, err := mc.Collection(coll).CountDocuments(mc.ctx, filter, options.Count().SetLimit(1))
2023-05-24 15:10:15 +09:00
if err != nil {
return false, err
}
return cnt > 0, nil
}
func (mc *MongoClient) SearchText(coll CollectionName, text string, opts ...*options.FindOptions) ([]bson.M, error) {
2024-06-25 10:13:50 +09:00
cursor, err := mc.Collection(coll).Find(mc.ctx, bson.M{"$text": bson.M{"$search": text}}, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return nil, err
}
2024-06-25 10:13:50 +09:00
defer cursor.Close(mc.ctx)
2023-05-24 15:10:15 +09:00
var output []bson.M
2024-06-25 10:13:50 +09:00
err = cursor.All(mc.ctx, &output)
2023-05-24 15:10:15 +09:00
if err != nil {
return nil, err
}
return output, nil
}
func (mc *MongoClient) FindAll(coll CollectionName, filter bson.M, opts ...*options.FindOptions) ([]bson.M, error) {
2024-06-25 10:13:50 +09:00
cursor, err := mc.Collection(coll).Find(mc.ctx, filter, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return nil, err
}
2024-06-25 10:13:50 +09:00
defer cursor.Close(mc.ctx)
2023-05-24 15:10:15 +09:00
var output []bson.M
2024-06-25 10:13:50 +09:00
err = cursor.All(mc.ctx, &output)
2023-05-24 15:10:15 +09:00
if err != nil {
return nil, err
}
return output, nil
}
func (mc *MongoClient) FindAllAs(coll CollectionName, filter bson.M, output interface{}, opts ...*options.FindOptions) error {
2024-06-25 10:13:50 +09:00
cursor, err := mc.Collection(coll).Find(mc.ctx, filter, opts...)
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
2024-06-25 10:13:50 +09:00
defer cursor.Close(mc.ctx)
2023-05-24 15:10:15 +09:00
2024-06-25 10:13:50 +09:00
err = cursor.All(mc.ctx, output)
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
return nil
}
func (mc *MongoClient) MakeExpireIndex(coll CollectionName, expireSeconds int32) error {
2023-05-24 15:10:15 +09:00
matchcoll := mc.Collection(coll)
2024-06-25 10:13:50 +09:00
indices, err := matchcoll.Indexes().List(mc.ctx, options.ListIndexes().SetMaxTime(time.Second))
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
allindices := make([]interface{}, 0)
2024-06-25 10:13:50 +09:00
err = indices.All(mc.ctx, &allindices)
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
tsfound := false
var tsname string
var exp int32
IndexSearchLabel:
for _, index := range allindices {
d := index.(bson.D)
key := d.Map()["key"].(bson.D)
for _, kd := range key {
if kd.Key == "_ts" {
tsfound = true
if v, ok := d.Map()["name"]; ok {
tsname = v.(string)
}
if v, ok := d.Map()["expireAfterSeconds"]; ok {
exp = v.(int32)
}
break IndexSearchLabel
}
}
}
if tsfound {
if exp == expireSeconds {
return nil
}
2024-06-25 10:13:50 +09:00
_, err = matchcoll.Indexes().DropOne(mc.ctx, tsname)
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
}
mod := mongo.IndexModel{
Keys: primitive.M{"_ts": 1},
Options: options.Index().SetExpireAfterSeconds(expireSeconds),
}
2024-06-25 10:13:50 +09:00
_, err = matchcoll.Indexes().CreateOne(mc.ctx, mod)
2023-05-24 15:10:15 +09:00
return err
}
func (mc *MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
2023-05-24 15:10:15 +09:00
collection := mc.Collection(coll)
2024-06-25 10:13:50 +09:00
cursor, err := collection.Indexes().List(mc.ctx, options.ListIndexes().SetMaxTime(time.Second))
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
2024-06-25 10:13:50 +09:00
defer cursor.Close(mc.ctx)
2023-05-24 15:10:15 +09:00
found := make(map[string]bool)
for k := range indices {
found[k] = false
}
2024-06-25 10:13:50 +09:00
for cursor.TryNext(mc.ctx) {
2023-05-24 15:10:15 +09:00
rawval := cursor.Current
name := rawval.Lookup("name").StringValue()
if _, ok := indices[name]; ok {
found[name] = true
}
}
for name, exist := range found {
if !exist {
v := indices[name]
var mod mongo.IndexModel
if len(v) == 1 {
mod = mongo.IndexModel{
Keys: primitive.M{v[0].Key: v[0].Value},
2023-11-26 15:46:34 +09:00
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
2023-05-24 15:10:15 +09:00
}
} else {
mod = mongo.IndexModel{
Keys: indices[name],
2023-11-26 15:46:34 +09:00
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
2023-05-24 15:10:15 +09:00
}
}
2024-06-25 10:13:50 +09:00
_, err = collection.Indexes().CreateOne(mc.ctx, mod)
2023-05-24 15:10:15 +09:00
if err != nil {
return err
}
}
}
return nil
}
func (mc *MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
2023-11-26 15:46:34 +09:00
return mc.makeIndicesWithOption(coll, indices, append(opts, options.Index().SetUnique(true))...)
2023-05-24 15:10:15 +09:00
}
func (mc *MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
2023-11-26 15:46:34 +09:00
return mc.makeIndicesWithOption(coll, indices, opts...)
2023-05-24 15:10:15 +09:00
}