package core import ( "context" "encoding/json" "errors" "fmt" "net/url" "strings" "time" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsonrw" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type groupMongo struct { *groupConfig mongoClient common.MongoClient hints map[string][]string collectionName common.CollectionName memberCollectionName common.CollectionName inviteCollectionName common.CollectionName } func (gm *groupMongo) Create(r url.Values, doc bson.M) (primitive.ObjectID, error) { var filter bson.M var hint string for h, fields := range gm.hints { candidate := bson.M{} for _, f := range fields { if fv := r.Get(f); len(fv) == 0 { break } else if f == "_id" { candidate["_id"], _ = primitive.ObjectIDFromHex(fv) } else if f == "after" { candidate["luts"] = bson.M{ "$gt": common.DotStringToTimestamp(fv), } } else { candidate[f] = fv } } if len(filter) < len(candidate) { filter = candidate hint = h } } if len(filter) == 1 { if _, ok := filter["_id"]; ok { hint = "_id_" } } if len(filter) == 0 { return primitive.NilObjectID, fmt.Errorf("CreateGroup failed. filter is missing : %v", r) } for f := range filter { delete(doc, f) } doc["members"] = []primitive.ObjectID{} _, inserted, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ "$setOnInsert": doc, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(true).SetHint(hint)) if err != nil { return primitive.NilObjectID, err } if inserted == nil { return primitive.NilObjectID, errors.New("name is duplicated") } return inserted.(primitive.ObjectID), nil } var errAlreadyMemberOrDeletedMember = errors.New("JoinGroup failed. already member or deleting member") func (gm *groupMongo) Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.CandidateExpire)).Unix() doc["expiring"] = expireAt doc["_candidate"] = true success, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_gid": groupID, "_mid": memberID, }, bson.M{ "$setOnInsert": doc, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(true)) if err != nil { logger.Error("JoinGroup failed. update candidate member collection err :", groupID, memberID, err) return err } if !success { // 중복해서 보내지 말아라 // 거절된 candidate가 또 요청을 보내더라도 expire될 때까지 계속 거절된다. logger.Println("JoinGroup failed. already member or deleting member :", groupID, memberID) return errAlreadyMemberOrDeletedMember } return nil } func (gm *groupMongo) Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (primitive.ObjectID, error) { if ticketID == primitive.NilObjectID { ticketID = primitive.NewObjectID() doc["_gid"] = groupID doc["_mid"] = memberID } // member collection에 추가. 추가된 _id를 group Member에 push한다. // 이렇게 하는 이유는 member document가 delete될때 _id만 알 수 있기 때문. // 클라이언트는 _id와 member id와의 관계를 알 수 있어야 한다. filter := bson.M{"_id": groupID} if gm.MaxMember > 0 { // 풀방 플래그 filter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} } success, _, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ "$push": bson.M{ "members": ticketID, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false).SetHint("_idmembers")) if err != nil { logger.Error("JoinGroup failed :", err) return primitive.NilObjectID, err } if !success { // 갑자기 풀방이 되었거나 이미 멤버다 logger.Println("JoinGroup failed. push member failed :", groupID, memberID) return primitive.NilObjectID, errAlreadyMemberOrDeletedMember } doc["_ts"] = nil doc["expiring"] = nil doc["_candidate"] = nil findoc := splitDocument(doc) findoc["$currentDate"] = bson.M{ "luts": bson.M{"$type": "timestamp"}, } success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_id": ticketID, }, findoc, options.Update().SetUpsert(true)) if err != nil || !success { gm.mongoClient.Update(gm.collectionName, bson.M{ "_id": groupID, }, bson.M{ "$pull": bson.M{ "members": ticketID, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false)) logger.Error("JoinGroup failed. update member collection err :", err) return primitive.NilObjectID, err } return ticketID, nil } func (gm *groupMongo) FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID { tid, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ "_gid": groupID, "_mid": memberID, }, options.FindOne().SetHint("gidmid").SetProjection(bson.M{"_id": 1})) if err != nil { logger.Error("FindTicketID failed :", err) return primitive.NilObjectID } if tid == nil { logger.Error("FindTicketID failed. tid not found :", groupID, memberID) return primitive.NilObjectID } return tid["_id"].(primitive.ObjectID) } var errAlradyInvited = errors.New("already invited user") func (gm *groupMongo) Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { if gm.InviteeIsMember && gm.MaxMember > 0 { vacant, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ "_id": groupID, fmt.Sprintf("members.%d", gm.MaxMember-1): bson.M{"$exists": false}, }, options.FindOne().SetProjection(bson.M{"_id": 1})) if err != nil { return "", err } if vacant == nil { // 빈 자리가 없다 return "failed:full", nil } } expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.InviteExpire)).Unix() var success bool var err error tid := primitive.NewObjectID() // invitee에게 초대장 보내기 inviterDoc["_gid"] = groupID inviterDoc["_mid"] = memberID inviterDoc["expiring"] = expireAt success, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ "_id": tid, }, bson.M{ "$setOnInsert": inviterDoc, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetUpsert(true)) if err != nil { return "", err } if !success { return "", errAlradyInvited } inviteeDoc["expiring"] = expireAt inviteeDoc["_gid"] = groupID inviteeDoc["_mid"] = memberID success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_id": tid, // inviteCollectionName에 추가된 _id와 동일하게 맞춘다. 검색에 용이 }, bson.M{ "$setOnInsert": inviteeDoc, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetHint("gidmid").SetUpsert(true)) if err != nil { return "", err } if !success { return "", errAlradyInvited } if gm.InviteeIsMember { // 멤버로도 추가 pushfilter := bson.M{"_id": groupID} if gm.MaxMember > 0 { pushfilter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} } success, _, err = gm.mongoClient.Update(gm.collectionName, pushfilter, bson.M{ "$push": bson.M{ "members": tid, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false)) if err != nil { return "", err } if !success { return "", errAlradyInvited } if !success { // 이미 풀방.아래 Delete/Update는 실패해도 괜찮다. gm.mongoClient.Delete(gm.memberCollectionName, bson.M{"_id": tid}) gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ "_id": tid, }, bson.M{ "$set": bson.M{"name": ""}, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false)) return "failed:full", nil } } return "success", nil } func (gm *groupMongo) UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (err error) { var findoc bson.M if doc == nil { findoc = bson.M{ "$set": bson.M{ "_delete": true, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, "_ts": bson.M{"$type": "date"}, }, } } else { findoc = splitDocument(doc) findoc["$currentDate"] = bson.M{ "luts": bson.M{"$type": "timestamp"}, } } if ticketID.IsZero() { _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_gid": groupID, "_mid": memberID, }, findoc, options.Update().SetHint("gidmid").SetUpsert(false)) } else { _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_id": ticketID, }, findoc, options.Update().SetUpsert(false)) } return } func (gm *groupMongo) CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error { if gm.InviteeIsMember { pulled, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ "_id": groupID, }, bson.M{ "$pull": bson.M{ "members": ticketID, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false)) if err != nil { return err } if !pulled { return nil } } // member collection 삭제 _, err := gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ "_id": ticketID, }) if err != nil { return err } // 초대를 삭제하면 안된다. // expiring될 때까지 냅두고, 클라이언트가 expiring을 보고 알아서 지우게 한다. _, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ "_id": ticketID, }, bson.M{ "$set": bson.M{"expiring": -1}, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false)) return err } var errInvitationExpired = errors.New("invitation is expired") func (gm *groupMongo) AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) { gdoc, err := gm.mongoClient.FindOneAndUpdate(gm.memberCollectionName, bson.M{ "_id": ticketID, }, bson.M{ "$set": member, "$unset": bson.M{ "expiring": 1, "_ts": 1, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.FindOneAndUpdate().SetProjection(bson.M{"_gid": 1})) if err != nil { return primitive.NilObjectID, err } if gdoc == nil { // 만료되었다. return primitive.NilObjectID, errInvitationExpired } // 여기서는 삭제해도 된다. gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ "_id": ticketID, }) gidbytes := gdoc["_gid"].(primitive.ObjectID) return gidbytes, nil } var errNotInvited = errors.New("invitation is not mine") func (gm *groupMongo) DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error { // 여기서는 삭제해도 된다. invdoc, err := gm.mongoClient.FindOne(gm.inviteCollectionName, bson.M{ "_id": ticketID, }, options.FindOne().SetProjection("_gid")) if err != nil { return err } gid := invdoc["_gid"].(primitive.ObjectID) success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ "_id": gid, }, bson.M{ "$pull": bson.M{ "members": ticketID, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }) if err != nil { return err } if !success { return errNotInvited } gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_id": ticketID, }, bson.M{ "_delete": true, }, options.Update().SetUpsert(false)) gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ "_id": ticketID, }) return nil } func (gm *groupMongo) QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) { filter := bson.M{"_mid": memberID} if !after.IsZero() { filter["luts"] = bson.M{"$gt": after} } return gm.mongoClient.FindAll(gm.inviteCollectionName, filter, options.Find().SetLimit(20).SetHint("mid")) } func (gm *groupMongo) Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) { if filter == nil { filter = bson.M{"_id": groupID} } else { filter["_id"] = groupID } found, err := gm.mongoClient.FindOne(gm.collectionName, filter, options.FindOne().SetProjection(bson.M{"_id": 1})) if err != nil { return false, err } if found == nil { return false, nil } return true, nil } func (gm *groupMongo) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) { opt := options.Find().SetBatchSize(10) if len(projection) > 0 { projM := bson.M{} for _, proj := range strings.Split(projection, ",") { projM[proj] = 1 } } if !after.IsZero() { filter["luts"] = bson.M{"$gt": after} } return gm.mongoClient.FindAll(gm.collectionName, filter, opt) } func (gm *groupMongo) FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) { op := options.FindOne() if len(projection) > 0 { proj := bson.M{} for _, p := range strings.Split(projection, ",") { if p[0] == '-' { proj[strings.TrimSpace(p[1:])] = 0 } else if p[0] == '+' { proj[strings.TrimSpace(p[1:])] = 1 } else { proj[strings.TrimSpace(p)] = 1 } } op = op.SetProjection(proj) } return gm.mongoClient.FindOne(gm.collectionName, bson.M{"_id": groupID}, op) } func (gm *groupMongo) QueryMembers(groupID primitive.ObjectID, reqID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { op := options.Find() if len(projection) > 0 { proj := bson.M{} for _, p := range strings.Split(projection, ",") { if p[0] == '-' { proj[strings.TrimSpace(p[1:])] = 0 } else if p[0] == '+' { proj[strings.TrimSpace(p[1:])] = 1 } else { proj[strings.TrimSpace(p)] = 1 } } op = op.SetProjection(proj) } filter := bson.M{"_gid": groupID} if after.IsZero() { gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ "_gid": groupID, "expiring": bson.M{"$lt": time.Now().UTC().Unix()}, }) } else { filter["luts"] = bson.M{"$gt": after} } all, err := gm.mongoClient.FindAll(gm.memberCollectionName, filter, op) if err != nil { return nil, err } output := make(map[string]bson.M) for _, m := range all { output[m["_mid"].(primitive.ObjectID).Hex()] = m } return output, nil } func (gm *groupMongo) QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) { filter := bson.M{"_gid": groupID} if !ticketID.IsZero() { filter["_id"] = ticketID } else { filter["_mid"] = memberID } op := options.FindOne().SetHint("gidmid") var projdoc bson.M if len(projection) > 0 { projdoc = bson.M{ "_delete": 1, } for _, proj := range strings.Split(projection, ",") { projdoc[proj] = 1 } } else { projdoc = bson.M{ "_ts": 0, "_gid": 0, "_mid": 0, } } op.SetProjection(projdoc) return gm.mongoClient.FindOne(gm.memberCollectionName, filter, op) } func (gm *groupMongo) Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error { if ticketID.IsZero() { poptarget, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ "_gid": groupID, "_mid": memberID, }, options.FindOne().SetProjection(bson.M{"_id": 1}).SetHint("gidmid")) if err != nil { return err } // Find와 Delete를 나눠야 한다. // pull 하는 것이 더 중요하기 때문. // pull에 실패하더라도 _id가 남아있어야 다시 시도가 가능하다 if poptarget == nil { // 왠지 만료되었거나 문제가 잇다 return nil } if _, ok := poptarget["_id"]; !ok { return nil } ticketID = poptarget["_id"].(primitive.ObjectID) } _, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ "_id": groupID, }, bson.M{ "$pull": bson.M{ "members": ticketID, }, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }) if err != nil { return err } // Delete는 실패해도 넘어간다. gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_id": ticketID, }, bson.M{ "$set": bson.M{ "_delete": true, }, "$currentDate": bson.M{ "_ts": bson.M{"$type": "date"}, }, }) return nil } func (gm *groupMongo) UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { _, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ "_gid": groupID, "_mid": memberID, }, bson.M{ "$set": doc, "$currentDate": bson.M{ "luts": bson.M{"$type": "timestamp"}, }, }, options.Update().SetUpsert(false).SetHint("gidmid")) return err } func (gm *groupMongo) Dismiss(groupID primitive.ObjectID) error { _, err := gm.mongoClient.Delete(gm.collectionName, bson.M{ "_id": groupID, }) if err != nil { return err } gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ "_gid": groupID, }) return nil } var errUpdateGroupDocumentFailed = errors.New("update group document failed") func (gm *groupMongo) UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error { groupDoc, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ "_id": groupID, }, nil) if err != nil { return err } decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body)) if err != nil { return err } if err := decoder.Decode(&groupDoc); err != nil { return err } success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ "_id": groupID, }, bson.M{ "$set": groupDoc, }, options.Update().SetUpsert(false)) if err != nil { return err } if !success { return errUpdateGroupDocumentFailed } return nil } func (gm *groupMongo) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { return nil } func (gm *groupMongo) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, conn *wshandler.Richconn) error { return nil } type mongowatcher struct { collection common.CollectionName pipeline mongo.Pipeline op options.FullDocument onChanged func(string, *groupPipelineDocument) } func (w *mongowatcher) callOnChanged(region string, c *groupPipelineDocument) { defer func() { r := recover() if r != nil { logger.Error(r) } }() w.onChanged(region, c) } func (w *mongowatcher) monitorfunc(parentctx context.Context, region string, mongoClient common.MongoClient) { defer func() { r := recover() if r != nil { logger.Error(r) } logger.Println("watcher.,monitorfunc finished") }() var groupstream *mongo.ChangeStream var err error var ctx context.Context defer func() { if groupstream != nil { groupstream.Close(ctx) } }() for { err = nil if groupstream == nil { groupstream, err = mongoClient.Watch(w.collection, w.pipeline, options.ChangeStream().SetFullDocument(w.op)) if err != nil { logger.Error(err) time.Sleep(time.Minute) } ctx = context.TODO() } if groupstream != nil { changed := groupstream.TryNext(ctx) if ctx.Err() != nil { logger.Error("tavern monitorfunc TryNext error") logger.Error(ctx.Err()) groupstream.Close(ctx) groupstream = nil continue } if changed { var data groupPipelineDocument if err := groupstream.Decode(&data); err == nil { w.callOnChanged(region, &data) } } else if groupstream.Err() != nil || groupstream.ID() == 0 { logger.Error(groupstream.Err()) groupstream.Close(ctx) groupstream = nil } } } } func (w mongowatcher) start(ctx context.Context, region string, mongoClient common.MongoClient) { go w.monitorfunc(ctx, region, mongoClient) } func (cfg *groupConfig) preparePersistent(ctx context.Context, region string, dbconn common.MongoClient, wsh *wshandler.WebsocketHandler) (group, error) { uniqueindices := map[string]bson.D{} hints := map[string][]string{} for _, ui := range cfg.UniqueIndex { indexname := strings.ReplaceAll(ui, ",", "") keys := strings.Split(ui, ",") keydef := bson.D{} for _, k := range keys { keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) } uniqueindices[indexname] = keydef hints[indexname] = keys } collectionName := common.CollectionName(cfg.Name) memberCollectionName := common.CollectionName(cfg.Name + "-members") inviteCollectionName := common.CollectionName(cfg.Name + "-invites") err := dbconn.MakeUniqueIndices(collectionName, uniqueindices) if err != nil { return nil, err } indices := map[string]bson.D{} for _, ui := range cfg.SearchIndex { indexname := strings.ReplaceAll(ui, ",", "") keys := strings.Split(ui, ",") keydef := bson.D{} for _, k := range keys { keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) } indices[indexname] = keydef hints[indexname] = keys } if _, ok := indices["_idmembers"]; !ok { indices["_idmembers"] = bson.D{ {Key: "_id", Value: 1}, {Key: "members", Value: 1}, } } if len(cfg.TextSearchFields) > 0 { var tsi bson.D for _, stf := range cfg.TextSearchFields { tsi = append(tsi, bson.E{Key: stf, Value: "text"}) } indices[cfg.TextSearchFields[0]+"_text"] = tsi } err = dbconn.MakeIndices(collectionName, indices) if err != nil { return nil, err } dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ "gidmid": {bson.E{Key: "_gid", Value: 1}, bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, }) if cfg.InviteeExlusive { err = dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, }) } else { err = dbconn.MakeIndices(inviteCollectionName, map[string]bson.D{ "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, }) } if err != nil { return nil, err } if cfg.InviteExpire > 0 { err = dbconn.MakeExpireIndex(inviteCollectionName, cfg.InviteExpire) if err != nil { return nil, err } } err = dbconn.MakeUniqueIndices(memberCollectionName, map[string]primitive.D{ "gidmid": {{Key: "_gid", Value: 1}, {Key: "_mid", Value: 1}}, }) if err != nil { return nil, err } for _, mi := range cfg.MemberIndex { indexname := strings.ReplaceAll(mi, ",", "") keys := strings.Split(mi, ",") keydef := bson.D{} for _, k := range keys { keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) } err = dbconn.MakeIndices(memberCollectionName, map[string]bson.D{ indexname: keydef, }) if err != nil { return nil, err } } if cfg.InviteExpire > 0 { err = dbconn.MakeExpireIndex(memberCollectionName, cfg.InviteExpire) if err != nil { return nil, err } } groupwatcher := mongowatcher{ collection: collectionName, op: options.Default, pipeline: mongo.Pipeline{ bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{"update"}}, }}, }, }, }, }, onChanged: func(r string, data *groupPipelineDocument) { updates := data.UpdateDescription.UpdatedFields gid := data.DocumentKey.Id updates["_id"] = gid updates["_hint"] = cfg.Name wsh.Broadcast(r, gid, updates) }, } groupwatcher.start(ctx, region, dbconn) m1 := &mongowatcher{ collection: memberCollectionName, op: options.Default, pipeline: mongo.Pipeline{ bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{"insert"}}, }}, }, }, }, }, onChanged: func(r string, data *groupPipelineDocument) { gid := data.FullDocument["_gid"].(primitive.ObjectID) delete(data.FullDocument, "_gid") delete(data.FullDocument, "_mid") delete(data.FullDocument, "_ts") data.FullDocument["_hint"] = cfg.Name data.FullDocument["_fullDocument"] = true if _, candidate := data.FullDocument["_candidate"]; candidate { gid[0] |= 0x80 delete(data.FullDocument, "_candidate") } wsh.Broadcast(r, gid, data.FullDocument) }, } m1.start(ctx, region, dbconn) m2 := &mongowatcher{ collection: memberCollectionName, op: options.UpdateLookup, pipeline: mongo.Pipeline{ bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{"update"}}, }}, }, }, }, bson.D{ { Key: "$project", Value: bson.M{ "fullDocument._id": 1, "fullDocument._gid": 1, "fullDocument._mid": 1, "fullDocument._candidate": 1, "updateDescription": 1, }, }, }, }, onChanged: func(r string, data *groupPipelineDocument) { gid := data.FullDocument["_gid"].(primitive.ObjectID) updates := data.UpdateDescription.UpdatedFields updates["_id"] = data.FullDocument["_id"] updates["_hint"] = cfg.Name for _, r := range data.UpdateDescription.RemovedFileds { updates[r.(string)] = nil } if _, candidate := data.FullDocument["_candidate"]; candidate { // _candidate는 candidate 채널로 broadcast gid[0] |= 0x80 delete(data.FullDocument, "_candidate") } if _, ok := updates["_delete"]; ok { mid := data.FullDocument["_mid"].(primitive.ObjectID) if conn := wsh.Conn(r, mid); conn != nil { conn.Close() } } if v, ok := updates["_candidate"]; ok && v == nil { // candidate에서 벗어났네? 접속을 끊고 재접속 유도 mid := data.FullDocument["_mid"].(primitive.ObjectID) if conn := wsh.Conn(r, mid); conn != nil { conn.Close() } } wsh.Broadcast(r, gid, updates) }, } m2.start(ctx, region, dbconn) i1 := &mongowatcher{ collection: inviteCollectionName, op: options.UpdateLookup, pipeline: mongo.Pipeline{ bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{"insert", "update"}}, }}, }, }, }, }, onChanged: func(r string, data *groupPipelineDocument) { alias := data.FullDocument["_mid"].(primitive.ObjectID) conn := wsh.Conn(r, alias) if conn != nil { delete(data.FullDocument, "_ts") delete(data.FullDocument, "_mid") data.FullDocument["_fullDocument"] = true data.FullDocument["_hint"] = inviteCollectionName bt, _ := json.Marshal(data.FullDocument) conn.WriteBytes(bt) } }, } i1.start(ctx, region, dbconn) return &groupMongo{ groupConfig: cfg, mongoClient: dbconn, hints: hints, collectionName: collectionName, memberCollectionName: memberCollectionName, inviteCollectionName: inviteCollectionName, }, nil }