Files
tavern/core/group_mongo.go
2023-05-24 16:10:00 +09:00

1051 lines
26 KiB
Go

package core
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"
"time"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type groupMongo struct {
*groupConfig
mongoClient common.MongoClient
hints map[string][]string
collectionName common.CollectionName
memberCollectionName common.CollectionName
inviteCollectionName common.CollectionName
}
func (gm *groupMongo) Create(r url.Values, doc bson.M) (primitive.ObjectID, error) {
var filter bson.M
var hint string
for h, fields := range gm.hints {
candidate := bson.M{}
for _, f := range fields {
if fv := r.Get(f); len(fv) == 0 {
break
} else if f == "_id" {
candidate["_id"], _ = primitive.ObjectIDFromHex(fv)
} else if f == "after" {
candidate["luts"] = bson.M{
"$gt": common.DotStringToTimestamp(fv),
}
} else {
candidate[f] = fv
}
}
if len(filter) < len(candidate) {
filter = candidate
hint = h
}
}
if len(filter) == 1 {
if _, ok := filter["_id"]; ok {
hint = "_id_"
}
}
if len(filter) == 0 {
return primitive.NilObjectID, fmt.Errorf("CreateGroup failed. filter is missing : %v", r)
}
for f := range filter {
delete(doc, f)
}
doc["members"] = []primitive.ObjectID{}
_, inserted, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{
"$setOnInsert": doc,
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(true).SetHint(hint))
if err != nil {
return primitive.NilObjectID, err
}
if inserted == nil {
return primitive.NilObjectID, errors.New("name is duplicated")
}
return inserted.(primitive.ObjectID), nil
}
var errAlreadyMemberOrDeletedMember = errors.New("JoinGroup failed. already member or deleting member")
func (gm *groupMongo) Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error {
expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.CandidateExpire)).Unix()
doc["expiring"] = expireAt
doc["_candidate"] = true
success, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_gid": groupID,
"_mid": memberID,
}, bson.M{
"$setOnInsert": doc,
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(true))
if err != nil {
logger.Error("JoinGroup failed. update candidate member collection err :", groupID, memberID, err)
return err
}
if !success {
// 중복해서 보내지 말아라
// 거절된 candidate가 또 요청을 보내더라도 expire될 때까지 계속 거절된다.
logger.Println("JoinGroup failed. already member or deleting member :", groupID, memberID)
return errAlreadyMemberOrDeletedMember
}
return nil
}
func (gm *groupMongo) Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (primitive.ObjectID, error) {
if ticketID == primitive.NilObjectID {
ticketID = primitive.NewObjectID()
doc["_gid"] = groupID
doc["_mid"] = memberID
}
// member collection에 추가. 추가된 _id를 group Member에 push한다.
// 이렇게 하는 이유는 member document가 delete될때 _id만 알 수 있기 때문.
// 클라이언트는 _id와 member id와의 관계를 알 수 있어야 한다.
filter := bson.M{"_id": groupID}
if gm.MaxMember > 0 {
// 풀방 플래그
filter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false}
}
success, _, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{
"$push": bson.M{
"members": ticketID,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false).SetHint("_idmembers"))
if err != nil {
logger.Error("JoinGroup failed :", err)
return primitive.NilObjectID, err
}
if !success {
// 갑자기 풀방이 되었거나 이미 멤버다
logger.Println("JoinGroup failed. push member failed :", groupID, memberID)
return primitive.NilObjectID, errAlreadyMemberOrDeletedMember
}
doc["_ts"] = nil
doc["expiring"] = nil
doc["_candidate"] = nil
findoc := splitDocument(doc)
findoc["$currentDate"] = bson.M{
"luts": bson.M{"$type": "timestamp"},
}
success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_id": ticketID,
}, findoc, options.Update().SetUpsert(true))
if err != nil || !success {
gm.mongoClient.Update(gm.collectionName, bson.M{
"_id": groupID,
}, bson.M{
"$pull": bson.M{
"members": ticketID,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false))
logger.Error("JoinGroup failed. update member collection err :", err)
return primitive.NilObjectID, err
}
return ticketID, nil
}
func (gm *groupMongo) FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID {
tid, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{
"_gid": groupID,
"_mid": memberID,
}, options.FindOne().SetHint("gidmid").SetProjection(bson.M{"_id": 1}))
if err != nil {
logger.Error("FindTicketID failed :", err)
return primitive.NilObjectID
}
if tid == nil {
logger.Error("FindTicketID failed. tid not found :", groupID, memberID)
return primitive.NilObjectID
}
return tid["_id"].(primitive.ObjectID)
}
var errAlradyInvited = errors.New("already invited user")
func (gm *groupMongo) Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) {
if gm.InviteeIsMember && gm.MaxMember > 0 {
vacant, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{
"_id": groupID,
fmt.Sprintf("members.%d", gm.MaxMember-1): bson.M{"$exists": false},
}, options.FindOne().SetProjection(bson.M{"_id": 1}))
if err != nil {
return "", err
}
if vacant == nil {
// 빈 자리가 없다
return "failed:full", nil
}
}
expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.InviteExpire)).Unix()
var success bool
var err error
tid := primitive.NewObjectID()
// invitee에게 초대장 보내기
inviterDoc["_gid"] = groupID
inviterDoc["_mid"] = memberID
inviterDoc["expiring"] = expireAt
success, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{
"_id": tid,
}, bson.M{
"$setOnInsert": inviterDoc,
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetUpsert(true))
if err != nil {
return "", err
}
if !success {
return "", errAlradyInvited
}
inviteeDoc["expiring"] = expireAt
inviteeDoc["_gid"] = groupID
inviteeDoc["_mid"] = memberID
success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_id": tid, // inviteCollectionName에 추가된 _id와 동일하게 맞춘다. 검색에 용이
}, bson.M{
"$setOnInsert": inviteeDoc,
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetHint("gidmid").SetUpsert(true))
if err != nil {
return "", err
}
if !success {
return "", errAlradyInvited
}
if gm.InviteeIsMember {
// 멤버로도 추가
pushfilter := bson.M{"_id": groupID}
if gm.MaxMember > 0 {
pushfilter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false}
}
success, _, err = gm.mongoClient.Update(gm.collectionName, pushfilter, bson.M{
"$push": bson.M{
"members": tid,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false))
if err != nil {
return "", err
}
if !success {
return "", errAlradyInvited
}
if !success {
// 이미 풀방.아래 Delete/Update는 실패해도 괜찮다.
gm.mongoClient.Delete(gm.memberCollectionName, bson.M{"_id": tid})
gm.mongoClient.Update(gm.inviteCollectionName, bson.M{
"_id": tid,
}, bson.M{
"$set": bson.M{"name": ""},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false))
return "failed:full", nil
}
}
return "success", nil
}
func (gm *groupMongo) UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (err error) {
var findoc bson.M
if doc == nil {
findoc = bson.M{
"$set": bson.M{
"_delete": true,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
"_ts": bson.M{"$type": "date"},
},
}
} else {
findoc = splitDocument(doc)
findoc["$currentDate"] = bson.M{
"luts": bson.M{"$type": "timestamp"},
}
}
if ticketID.IsZero() {
_, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_gid": groupID,
"_mid": memberID,
}, findoc, options.Update().SetHint("gidmid").SetUpsert(false))
} else {
_, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_id": ticketID,
}, findoc, options.Update().SetUpsert(false))
}
return
}
func (gm *groupMongo) CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error {
if gm.InviteeIsMember {
pulled, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{
"_id": groupID,
}, bson.M{
"$pull": bson.M{
"members": ticketID,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false))
if err != nil {
return err
}
if !pulled {
return nil
}
}
// member collection 삭제
_, err := gm.mongoClient.Delete(gm.memberCollectionName, bson.M{
"_id": ticketID,
})
if err != nil {
return err
}
// 초대를 삭제하면 안된다.
// expiring될 때까지 냅두고, 클라이언트가 expiring을 보고 알아서 지우게 한다.
_, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{
"_id": ticketID,
}, bson.M{
"$set": bson.M{"expiring": -1},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false))
return err
}
var errInvitationExpired = errors.New("invitation is expired")
func (gm *groupMongo) AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) {
gdoc, err := gm.mongoClient.FindOneAndUpdate(gm.memberCollectionName, bson.M{
"_id": ticketID,
}, bson.M{
"$set": member,
"$unset": bson.M{
"expiring": 1,
"_ts": 1,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.FindOneAndUpdate().SetProjection(bson.M{"_gid": 1}))
if err != nil {
return primitive.NilObjectID, err
}
if gdoc == nil {
// 만료되었다.
return primitive.NilObjectID, errInvitationExpired
}
// 여기서는 삭제해도 된다.
gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{
"_id": ticketID,
})
gidbytes := gdoc["_gid"].(primitive.ObjectID)
return gidbytes, nil
}
var errNotInvited = errors.New("invitation is not mine")
func (gm *groupMongo) DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error {
// 여기서는 삭제해도 된다.
invdoc, err := gm.mongoClient.FindOne(gm.inviteCollectionName, bson.M{
"_id": ticketID,
}, options.FindOne().SetProjection("_gid"))
if err != nil {
return err
}
gid := invdoc["_gid"].(primitive.ObjectID)
success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{
"_id": gid,
}, bson.M{
"$pull": bson.M{
"members": ticketID,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
})
if err != nil {
return err
}
if !success {
return errNotInvited
}
gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_id": ticketID,
}, bson.M{
"_delete": true,
}, options.Update().SetUpsert(false))
gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{
"_id": ticketID,
})
return nil
}
func (gm *groupMongo) QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) {
filter := bson.M{"_mid": memberID}
if !after.IsZero() {
filter["luts"] = bson.M{"$gt": after}
}
return gm.mongoClient.FindAll(gm.inviteCollectionName, filter, options.Find().SetLimit(20).SetHint("mid"))
}
func (gm *groupMongo) Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) {
if filter == nil {
filter = bson.M{"_id": groupID}
} else {
filter["_id"] = groupID
}
found, err := gm.mongoClient.FindOne(gm.collectionName, filter, options.FindOne().SetProjection(bson.M{"_id": 1}))
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
return true, nil
}
func (gm *groupMongo) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) {
opt := options.Find().SetBatchSize(10)
if len(projection) > 0 {
projM := bson.M{}
for _, proj := range strings.Split(projection, ",") {
projM[proj] = 1
}
}
if !after.IsZero() {
filter["luts"] = bson.M{"$gt": after}
}
return gm.mongoClient.FindAll(gm.collectionName, filter, opt)
}
func (gm *groupMongo) FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) {
op := options.FindOne()
if len(projection) > 0 {
proj := bson.M{}
for _, p := range strings.Split(projection, ",") {
if p[0] == '-' {
proj[strings.TrimSpace(p[1:])] = 0
} else if p[0] == '+' {
proj[strings.TrimSpace(p[1:])] = 1
} else {
proj[strings.TrimSpace(p)] = 1
}
}
op = op.SetProjection(proj)
}
return gm.mongoClient.FindOne(gm.collectionName, bson.M{"_id": groupID}, op)
}
func (gm *groupMongo) QueryMembers(groupID primitive.ObjectID, reqID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) {
op := options.Find()
if len(projection) > 0 {
proj := bson.M{}
for _, p := range strings.Split(projection, ",") {
if p[0] == '-' {
proj[strings.TrimSpace(p[1:])] = 0
} else if p[0] == '+' {
proj[strings.TrimSpace(p[1:])] = 1
} else {
proj[strings.TrimSpace(p)] = 1
}
}
op = op.SetProjection(proj)
}
filter := bson.M{"_gid": groupID}
if after.IsZero() {
gm.mongoClient.Delete(gm.memberCollectionName, bson.M{
"_gid": groupID,
"expiring": bson.M{"$lt": time.Now().UTC().Unix()},
})
} else {
filter["luts"] = bson.M{"$gt": after}
}
all, err := gm.mongoClient.FindAll(gm.memberCollectionName, filter, op)
if err != nil {
return nil, err
}
output := make(map[string]bson.M)
for _, m := range all {
output[m["_mid"].(primitive.ObjectID).Hex()] = m
}
return output, nil
}
func (gm *groupMongo) QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) {
filter := bson.M{"_gid": groupID}
if !ticketID.IsZero() {
filter["_id"] = ticketID
} else {
filter["_mid"] = memberID
}
op := options.FindOne().SetHint("gidmid")
var projdoc bson.M
if len(projection) > 0 {
projdoc = bson.M{
"_delete": 1,
}
for _, proj := range strings.Split(projection, ",") {
projdoc[proj] = 1
}
} else {
projdoc = bson.M{
"_ts": 0,
"_gid": 0,
"_mid": 0,
}
}
op.SetProjection(projdoc)
return gm.mongoClient.FindOne(gm.memberCollectionName, filter, op)
}
func (gm *groupMongo) Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error {
if ticketID.IsZero() {
poptarget, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{
"_gid": groupID,
"_mid": memberID,
}, options.FindOne().SetProjection(bson.M{"_id": 1}).SetHint("gidmid"))
if err != nil {
return err
}
// Find와 Delete를 나눠야 한다.
// pull 하는 것이 더 중요하기 때문.
// pull에 실패하더라도 _id가 남아있어야 다시 시도가 가능하다
if poptarget == nil {
// 왠지 만료되었거나 문제가 잇다
return nil
}
if _, ok := poptarget["_id"]; !ok {
return nil
}
ticketID = poptarget["_id"].(primitive.ObjectID)
}
_, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{
"_id": groupID,
}, bson.M{
"$pull": bson.M{
"members": ticketID,
},
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
})
if err != nil {
return err
}
// Delete는 실패해도 넘어간다.
gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_id": ticketID,
}, bson.M{
"$set": bson.M{
"_delete": true,
},
"$currentDate": bson.M{
"_ts": bson.M{"$type": "date"},
},
})
return nil
}
func (gm *groupMongo) UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error {
_, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{
"_gid": groupID,
"_mid": memberID,
}, bson.M{
"$set": doc,
"$currentDate": bson.M{
"luts": bson.M{"$type": "timestamp"},
},
}, options.Update().SetUpsert(false).SetHint("gidmid"))
return err
}
func (gm *groupMongo) Dismiss(groupID primitive.ObjectID) error {
_, err := gm.mongoClient.Delete(gm.collectionName, bson.M{
"_id": groupID,
})
if err != nil {
return err
}
gm.mongoClient.Delete(gm.memberCollectionName, bson.M{
"_gid": groupID,
})
return nil
}
var errUpdateGroupDocumentFailed = errors.New("update group document failed")
func (gm *groupMongo) UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error {
groupDoc, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{
"_id": groupID,
}, nil)
if err != nil {
return err
}
decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body))
if err != nil {
return err
}
if err := decoder.Decode(&groupDoc); err != nil {
return err
}
success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{
"_id": groupID,
}, bson.M{
"$set": groupDoc,
}, options.Update().SetUpsert(false))
if err != nil {
return err
}
if !success {
return errUpdateGroupDocumentFailed
}
return nil
}
func (gm *groupMongo) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error {
return nil
}
func (gm *groupMongo) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, conn *wshandler.Richconn) error {
return nil
}
type mongowatcher struct {
collection common.CollectionName
pipeline mongo.Pipeline
op options.FullDocument
onChanged func(string, *groupPipelineDocument)
}
func (w *mongowatcher) callOnChanged(region string, c *groupPipelineDocument) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
w.onChanged(region, c)
}
func (w *mongowatcher) monitorfunc(parentctx context.Context, region string, mongoClient common.MongoClient) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
logger.Println("watcher.,monitorfunc finished")
}()
var groupstream *mongo.ChangeStream
var err error
var ctx context.Context
defer func() {
if groupstream != nil {
groupstream.Close(ctx)
}
}()
for {
err = nil
if groupstream == nil {
groupstream, err = mongoClient.Watch(w.collection, w.pipeline, options.ChangeStream().SetFullDocument(w.op))
if err != nil {
logger.Error(err)
time.Sleep(time.Minute)
}
ctx = context.TODO()
}
if groupstream != nil {
changed := groupstream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("tavern monitorfunc TryNext error")
logger.Error(ctx.Err())
groupstream.Close(ctx)
groupstream = nil
continue
}
if changed {
var data groupPipelineDocument
if err := groupstream.Decode(&data); err == nil {
w.callOnChanged(region, &data)
}
} else if groupstream.Err() != nil || groupstream.ID() == 0 {
logger.Error(groupstream.Err())
groupstream.Close(ctx)
groupstream = nil
}
}
}
}
func (w mongowatcher) start(ctx context.Context, region string, mongoClient common.MongoClient) {
go w.monitorfunc(ctx, region, mongoClient)
}
func (cfg *groupConfig) preparePersistent(ctx context.Context, region string, dbconn common.MongoClient, wsh *wshandler.WebsocketHandler) (group, error) {
uniqueindices := map[string]bson.D{}
hints := map[string][]string{}
for _, ui := range cfg.UniqueIndex {
indexname := strings.ReplaceAll(ui, ",", "")
keys := strings.Split(ui, ",")
keydef := bson.D{}
for _, k := range keys {
keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1})
}
uniqueindices[indexname] = keydef
hints[indexname] = keys
}
collectionName := common.CollectionName(cfg.Name)
memberCollectionName := common.CollectionName(cfg.Name + "-members")
inviteCollectionName := common.CollectionName(cfg.Name + "-invites")
err := dbconn.MakeUniqueIndices(collectionName, uniqueindices)
if err != nil {
return nil, err
}
indices := map[string]bson.D{}
for _, ui := range cfg.SearchIndex {
indexname := strings.ReplaceAll(ui, ",", "")
keys := strings.Split(ui, ",")
keydef := bson.D{}
for _, k := range keys {
keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1})
}
indices[indexname] = keydef
hints[indexname] = keys
}
if _, ok := indices["_idmembers"]; !ok {
indices["_idmembers"] = bson.D{
{Key: "_id", Value: 1},
{Key: "members", Value: 1},
}
}
if len(cfg.TextSearchFields) > 0 {
var tsi bson.D
for _, stf := range cfg.TextSearchFields {
tsi = append(tsi, bson.E{Key: stf, Value: "text"})
}
indices[cfg.TextSearchFields[0]+"_text"] = tsi
}
err = dbconn.MakeIndices(collectionName, indices)
if err != nil {
return nil, err
}
dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{
"gidmid": {bson.E{Key: "_gid", Value: 1}, bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}},
})
if cfg.InviteeExlusive {
err = dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{
"mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}},
})
} else {
err = dbconn.MakeIndices(inviteCollectionName, map[string]bson.D{
"mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}},
})
}
if err != nil {
return nil, err
}
if cfg.InviteExpire > 0 {
err = dbconn.MakeExpireIndex(inviteCollectionName, cfg.InviteExpire)
if err != nil {
return nil, err
}
}
err = dbconn.MakeUniqueIndices(memberCollectionName, map[string]primitive.D{
"gidmid": {{Key: "_gid", Value: 1}, {Key: "_mid", Value: 1}},
})
if err != nil {
return nil, err
}
for _, mi := range cfg.MemberIndex {
indexname := strings.ReplaceAll(mi, ",", "")
keys := strings.Split(mi, ",")
keydef := bson.D{}
for _, k := range keys {
keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1})
}
err = dbconn.MakeIndices(memberCollectionName, map[string]bson.D{
indexname: keydef,
})
if err != nil {
return nil, err
}
}
if cfg.InviteExpire > 0 {
err = dbconn.MakeExpireIndex(memberCollectionName, cfg.InviteExpire)
if err != nil {
return nil, err
}
}
groupwatcher := mongowatcher{
collection: collectionName,
op: options.Default,
pipeline: mongo.Pipeline{
bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{"update"}},
}},
},
},
},
},
onChanged: func(r string, data *groupPipelineDocument) {
updates := data.UpdateDescription.UpdatedFields
gid := data.DocumentKey.Id
updates["_id"] = gid
updates["_hint"] = cfg.Name
wsh.Broadcast(r, gid, updates)
},
}
groupwatcher.start(ctx, region, dbconn)
m1 := &mongowatcher{
collection: memberCollectionName,
op: options.Default,
pipeline: mongo.Pipeline{
bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{"insert"}},
}},
},
},
},
},
onChanged: func(r string, data *groupPipelineDocument) {
gid := data.FullDocument["_gid"].(primitive.ObjectID)
delete(data.FullDocument, "_gid")
delete(data.FullDocument, "_mid")
delete(data.FullDocument, "_ts")
data.FullDocument["_hint"] = cfg.Name
data.FullDocument["_fullDocument"] = true
if _, candidate := data.FullDocument["_candidate"]; candidate {
gid[0] |= 0x80
delete(data.FullDocument, "_candidate")
}
wsh.Broadcast(r, gid, data.FullDocument)
},
}
m1.start(ctx, region, dbconn)
m2 := &mongowatcher{
collection: memberCollectionName,
op: options.UpdateLookup,
pipeline: mongo.Pipeline{
bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{"update"}},
}},
},
},
},
bson.D{
{
Key: "$project", Value: bson.M{
"fullDocument._id": 1,
"fullDocument._gid": 1,
"fullDocument._mid": 1,
"fullDocument._candidate": 1,
"updateDescription": 1,
},
},
},
},
onChanged: func(r string, data *groupPipelineDocument) {
gid := data.FullDocument["_gid"].(primitive.ObjectID)
updates := data.UpdateDescription.UpdatedFields
updates["_id"] = data.FullDocument["_id"]
updates["_hint"] = cfg.Name
for _, r := range data.UpdateDescription.RemovedFileds {
updates[r.(string)] = nil
}
if _, candidate := data.FullDocument["_candidate"]; candidate {
// _candidate는 candidate 채널로 broadcast
gid[0] |= 0x80
delete(data.FullDocument, "_candidate")
}
if _, ok := updates["_delete"]; ok {
mid := data.FullDocument["_mid"].(primitive.ObjectID)
if conn := wsh.Conn(r, mid); conn != nil {
conn.Close()
}
}
if v, ok := updates["_candidate"]; ok && v == nil {
// candidate에서 벗어났네? 접속을 끊고 재접속 유도
mid := data.FullDocument["_mid"].(primitive.ObjectID)
if conn := wsh.Conn(r, mid); conn != nil {
conn.Close()
}
}
wsh.Broadcast(r, gid, updates)
},
}
m2.start(ctx, region, dbconn)
i1 := &mongowatcher{
collection: inviteCollectionName,
op: options.UpdateLookup,
pipeline: mongo.Pipeline{
bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{"insert", "update"}},
}},
},
},
},
},
onChanged: func(r string, data *groupPipelineDocument) {
alias := data.FullDocument["_mid"].(primitive.ObjectID)
conn := wsh.Conn(r, alias)
if conn != nil {
delete(data.FullDocument, "_ts")
delete(data.FullDocument, "_mid")
data.FullDocument["_fullDocument"] = true
data.FullDocument["_hint"] = inviteCollectionName
bt, _ := json.Marshal(data.FullDocument)
conn.WriteBytes(bt)
}
},
}
i1.start(ctx, region, dbconn)
return &groupMongo{
groupConfig: cfg,
mongoClient: dbconn,
hints: hints,
collectionName: collectionName,
memberCollectionName: memberCollectionName,
inviteCollectionName: inviteCollectionName,
}, nil
}