323 lines
7.8 KiB
Go
323 lines
7.8 KiB
Go
package shared
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"os"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
type MongoClient struct {
|
|
db *mongo.Database
|
|
c *mongo.Client
|
|
}
|
|
|
|
type ConnectionInfo struct {
|
|
Url string
|
|
Database string
|
|
}
|
|
|
|
var Devflag = flag.Bool("dev", false, "")
|
|
|
|
type CollectionName string
|
|
|
|
const (
|
|
CollectionCoupon = CollectionName("coupon")
|
|
CollectionCouponUse = CollectionName("coupon_use")
|
|
CollectionAccount = CollectionName("account")
|
|
CollectionAuth = CollectionName("auth")
|
|
CollectionMatch = CollectionName("match")
|
|
)
|
|
|
|
func mongourl() string {
|
|
v := os.Getenv("MONGO_URL")
|
|
if len(v) > 0 {
|
|
return v
|
|
}
|
|
return "mongodb://redis-dev.actionsquare.corp:27017/?replicaSet=repl01"
|
|
}
|
|
|
|
func NewMongoConnectionInfo() *ConnectionInfo {
|
|
if !flag.Parsed() {
|
|
flag.Parse()
|
|
}
|
|
dbname := "anvil"
|
|
if *Devflag {
|
|
dbname, _ = os.Hostname()
|
|
}
|
|
|
|
return &ConnectionInfo{
|
|
Url: mongourl(),
|
|
Database: dbname,
|
|
}
|
|
}
|
|
func (ci *ConnectionInfo) SetURL(url string) *ConnectionInfo {
|
|
ci.Url = url
|
|
return ci
|
|
}
|
|
|
|
func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
|
|
ci.Database = dbname
|
|
return ci
|
|
}
|
|
|
|
func NewMongoClient(ci *ConnectionInfo) (MongoClient, error) {
|
|
if len(ci.Url) == 0 {
|
|
return MongoClient{}, errors.New("mongo connection string is empty")
|
|
}
|
|
|
|
client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url))
|
|
if err != nil {
|
|
return MongoClient{}, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
err = client.Connect(ctx)
|
|
if err != nil {
|
|
return MongoClient{}, err
|
|
}
|
|
err = client.Ping(ctx, nil)
|
|
if err != nil {
|
|
return MongoClient{}, err
|
|
}
|
|
|
|
anvildb := client.Database(ci.Database, nil)
|
|
makeExpiredIndex := func(collname CollectionName, expireSeconds int32) error {
|
|
matchcoll := anvildb.Collection(string(collname))
|
|
indices, err := matchcoll.Indexes().List(ctx, options.ListIndexes().SetMaxTime(time.Second))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
allindices := make([]interface{}, 0)
|
|
err = indices.All(context.Background(), &allindices)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tsfound := false
|
|
var tsname string
|
|
var exp int32
|
|
for _, index := range allindices {
|
|
d := index.(bson.D)
|
|
key := d.Map()["key"].(bson.D)
|
|
for _, kd := range key {
|
|
if kd.Key == "_ts" {
|
|
tsfound = true
|
|
}
|
|
}
|
|
|
|
if v, ok := d.Map()["name"]; ok {
|
|
tsname = v.(string)
|
|
}
|
|
if v, ok := d.Map()["expireAfterSeconds"]; ok {
|
|
exp = v.(int32)
|
|
}
|
|
}
|
|
|
|
if tsfound {
|
|
if exp == expireSeconds {
|
|
return nil
|
|
}
|
|
_, err = matchcoll.Indexes().DropOne(ctx, tsname)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
mod := mongo.IndexModel{
|
|
Keys: primitive.M{"_ts": 1},
|
|
Options: options.Index().SetExpireAfterSeconds(expireSeconds),
|
|
}
|
|
|
|
_, err = matchcoll.Indexes().CreateOne(ctx, mod)
|
|
return err
|
|
}
|
|
|
|
if err = makeExpiredIndex(CollectionMatch, 30); err != nil {
|
|
return MongoClient{}, err
|
|
}
|
|
if err = makeExpiredIndex(CollectionAuth, 300); err != nil {
|
|
return MongoClient{}, err
|
|
}
|
|
|
|
return MongoClient{c: client, db: anvildb}, nil
|
|
}
|
|
|
|
func (mc MongoClient) Close() {
|
|
if mc.c != nil {
|
|
mc.c.Disconnect(context.Background())
|
|
}
|
|
}
|
|
|
|
func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline) (*mongo.ChangeStream, error) {
|
|
return mc.Collection(coll).Watch(context.Background(), pipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0))
|
|
}
|
|
|
|
func (mc MongoClient) Collection(collname CollectionName) *mongo.Collection {
|
|
return mc.db.Collection(string(collname))
|
|
}
|
|
|
|
func (mc MongoClient) All(coll CollectionName, opts ...*options.FindOptions) ([]bson.M, error) {
|
|
cursor, err := mc.Collection(coll).Find(context.Background(), bson.D{}, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var all []bson.M
|
|
err = cursor.All(context.Background(), &all)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return all, nil
|
|
}
|
|
|
|
func (mc MongoClient) FindOneAndDelete(coll CollectionName, filter bson.M) (bson.M, error) {
|
|
result := mc.Collection(coll).FindOneAndDelete(context.Background(), filter)
|
|
err := result.Err()
|
|
if err != nil {
|
|
if err == mongo.ErrNoDocuments {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
tmp := make(map[string]interface{})
|
|
err = result.Decode(&tmp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return bson.M(tmp), nil
|
|
}
|
|
|
|
func (mc MongoClient) Delete(coll CollectionName, filter bson.M) (bool, error) {
|
|
r, err := mc.Collection(coll).DeleteOne(context.Background(), filter)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return r.DeletedCount > 0, nil
|
|
}
|
|
|
|
func (mc MongoClient) UnsetField(coll CollectionName, filter bson.M, doc bson.M) error {
|
|
_, err := mc.Collection(coll).UpdateOne(context.Background(), filter, bson.M{
|
|
"$unset": doc,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (mc MongoClient) DeleteMany(coll CollectionName, filters bson.M, opts ...*options.DeleteOptions) (int, error) {
|
|
result, err := mc.Collection(coll).DeleteMany(context.Background(), filters, opts...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return int(result.DeletedCount), nil
|
|
}
|
|
|
|
func (mc MongoClient) InsertMany(coll CollectionName, documents []interface{}, opts ...*options.InsertManyOptions) (int, error) {
|
|
result, err := mc.Collection(coll).InsertMany(context.Background(), documents, opts...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return len(result.InsertedIDs), nil
|
|
}
|
|
|
|
func (mc MongoClient) UpdateMany(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (count int, err error) {
|
|
result, e := mc.Collection(coll).UpdateMany(context.Background(), filter, doc, opts...)
|
|
|
|
if e != nil {
|
|
return 0, e
|
|
}
|
|
|
|
err = nil
|
|
count = int(result.UpsertedCount + result.ModifiedCount)
|
|
return
|
|
}
|
|
|
|
func (mc MongoClient) Update(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (worked bool, newid interface{}, err error) {
|
|
result, e := mc.Collection(coll).UpdateOne(context.Background(), filter, doc, opts...)
|
|
|
|
if e != nil {
|
|
return false, "", e
|
|
}
|
|
|
|
err = nil
|
|
worked = result.UpsertedCount > 0 || result.ModifiedCount > 0
|
|
newid = result.UpsertedID
|
|
return
|
|
}
|
|
|
|
func (mc MongoClient) UpsertOne(coll CollectionName, filter bson.M, doc bson.M) (worked bool, newid interface{}, err error) {
|
|
return mc.Update(coll, filter, bson.M{
|
|
"$set": doc,
|
|
}, options.Update().SetUpsert(true))
|
|
}
|
|
|
|
func (mc MongoClient) FindOne(coll CollectionName, filter bson.M, opts ...*options.FindOneOptions) (doc bson.M, err error) {
|
|
result := mc.Collection(coll).FindOne(context.Background(), filter, opts...)
|
|
tmp := make(map[string]interface{})
|
|
err = result.Decode(&tmp)
|
|
if err == nil {
|
|
doc = bson.M(tmp)
|
|
} else if err == mongo.ErrNoDocuments {
|
|
err = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (mc MongoClient) FindOneAndUpdate(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.FindOneAndUpdateOptions) (olddoc bson.M, err error) {
|
|
result := mc.Collection(coll).FindOneAndUpdate(context.Background(), filter, doc, opts...)
|
|
tmp := make(map[string]interface{})
|
|
err = result.Decode(&tmp)
|
|
if err == nil {
|
|
olddoc = bson.M(tmp)
|
|
} else if err == mongo.ErrNoDocuments {
|
|
err = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (mc MongoClient) Exists(coll CollectionName, filter bson.M) (bool, error) {
|
|
cnt, err := mc.Collection(coll).CountDocuments(context.Background(), filter, options.Count().SetLimit(1))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return cnt > 0, nil
|
|
}
|
|
|
|
func (mc MongoClient) FindAll(coll CollectionName, filter bson.M, opts ...*options.FindOptions) ([]bson.M, error) {
|
|
cursor, err := mc.Collection(coll).Find(context.Background(), filter, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
output := make([]interface{}, 0)
|
|
err = cursor.All(context.Background(), &output)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
docs := make([]bson.M, 0, len(output))
|
|
for _, doc := range output {
|
|
one := make(bson.M)
|
|
for _, kv := range doc.(bson.D) {
|
|
one[kv.Key] = kv.Value
|
|
}
|
|
docs = append(docs, one)
|
|
}
|
|
return docs, nil
|
|
}
|