package core import ( "context" "encoding/gob" "encoding/json" "errors" "fmt" "net/http" "strings" "time" "github.com/gorilla/websocket" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" ) const ( monitoring_center_count = 100 state_offline = "offline" ) type friendDoc struct { Id primitive.ObjectID `bson:"_id,omitempty" json:"_id"` From primitive.ObjectID `bson:"from" json:"-"` To primitive.ObjectID `bson:"to" json:"-"` ToAlias string `bson:"talias" json:"to"` Timestamp int64 `bson:"ts" json:"ts"` Deleted bool `bson:"deleted,omitempty" json:"deleted,omitempty"` } type registerListener struct { src primitive.ObjectID alias string l *listener } type monitoringCenter struct { regChan chan registerListener publishState func(primitive.ObjectID, string) } type friends struct { mongoClient gocommon.MongoClient redison *gocommon.RedisonHandler wsh *wshandler.WebsocketHandler moncen []monitoringCenter conns *connections } type listener struct { c *websocket.Conn me primitive.ObjectID } type listenerMap struct { listeners map[primitive.ObjectID]*listener offline []byte lastState []byte } func init() { gob.Register([]friendDoc{}) } // per channel // src(alias) - listener(objectid) : socket // - listener(objectid) : socket // - listener(objectid) : socket func combineObjectID(l primitive.ObjectID, r primitive.ObjectID) (out primitive.ObjectID) { copy(out[0:2], l[2:4]) copy(out[2:6], l[8:12]) copy(out[6:8], r[2:4]) copy(out[8:12], r[8:12]) return } func makeSrcMap(src string, state []byte) *listenerMap { offline, _ := json.Marshal(wshandler.DownstreamMessage{ Alias: src, Body: bson.M{}, Tag: friend_state_tag, }) return &listenerMap{ listeners: make(map[primitive.ObjectID]*listener), offline: offline, lastState: state, } } func makeFriends(ctx context.Context, so *Social, conns *connections) (*friends, error) { if err := so.mongoClient.MakeUniqueIndices(friends_collection_name, map[string]bson.D{ "fromto": {{Key: "from", Value: 1}, {Key: "to", Value: 1}}, }); err != nil { return nil, err } var moncen []monitoringCenter for i := 0; i < monitoring_center_count; i++ { subChannel := fmt.Sprintf("_soc_fr_monitor_ch_%d_%d", i, so.redison.Options().DB) regChan := make(chan registerListener) moncen = append(moncen, monitoringCenter{ regChan: regChan, publishState: func(accid primitive.ObjectID, state string) { if len(state) == 0 { so.redison.Publish(ctx, subChannel, accid.Hex()).Result() } else { so.redison.Publish(ctx, subChannel, accid.Hex()+state).Result() } }, }) go func(subChannel string, regChan chan registerListener) { pubsub := so.redison.Subscribe(ctx, subChannel) listeners := make(map[primitive.ObjectID]*listenerMap) for { select { case reg := <-regChan: // 내가 관심있는 애들 등록 srcmap, exists := listeners[reg.src] if !exists { srcmap = makeSrcMap(reg.alias, nil) listeners[reg.src] = srcmap } if reg.l.c == nil { // 등록 해제. 모니터링 종료 // listener목록에서 나(reg.l.me)를 제거 delete(srcmap.listeners, reg.l.me) exists = false logger.Println("regChan unregistered :", reg.src.Hex(), reg.l.me.Hex()) } else if oldl, ok := srcmap.listeners[reg.l.me]; ok { // 내가 이미 리스너로 등록되어 있다. // 상대방이 나를 차단했을 경우에는 기존 리스너가 nil임 exists = oldl != nil } else { srcmap.listeners[reg.l.me] = reg.l } if exists && srcmap != nil && len(srcmap.lastState) > 0 { reg.l.c.WriteMessage(websocket.TextMessage, srcmap.lastState) } if len(srcmap.listeners) == 0 && len(srcmap.lastState) == 0 { delete(listeners, reg.src) } case msg := <-pubsub.Channel(): target, _ := primitive.ObjectIDFromHex(msg.Payload[:24]) state := msg.Payload[24:] if srcmap, ok := listeners[target]; ok { if srcmap == nil { delete(listeners, target) break } if len(state) == 0 { // 접속 종료 srcmap.lastState = nil if len(srcmap.listeners) == 0 { delete(listeners, target) } for _, l := range srcmap.listeners { l.c.WriteMessage(websocket.TextMessage, srcmap.offline) } } else { srcmap.lastState = []byte(state) for _, l := range srcmap.listeners { l.c.WriteMessage(websocket.TextMessage, srcmap.lastState) } } } else if len(state) > 0 { var dnstream wshandler.DownstreamMessage if err := json.Unmarshal([]byte(state), &dnstream); err == nil { listeners[target] = makeSrcMap(dnstream.Alias, []byte(state)) } } } } }(subChannel, regChan) } return &friends{ mongoClient: so.mongoClient, redison: so.redison, wsh: so.wsh, moncen: moncen, conns: conns, }, nil } func (fs *friends) ClientDisconnected(msg string, callby *wshandler.Sender) { // 로그 오프 상태를 알림 meidx := callby.Accid[11] % monitoring_center_count fs.moncen[meidx].publishState(callby.Accid, "") fs.stopMonitoringFriends(callby.Accid) } var errAddFriendFailed = errors.New("addFriend failed") func (fs *friends) addFriend(f *friendDoc) error { _, newid, err := fs.mongoClient.Update(friends_collection_name, bson.M{ "_id": combineObjectID(f.From, f.To), }, bson.M{ "$setOnInsert": f, }, options.Update().SetUpsert(true)) if err != nil { return err } if newid == nil { return errAddFriendFailed } f.Id = newid.(primitive.ObjectID) if fs.conns.addFriend(f.From, f) { // 모니터링 중 conn := fs.conns.conn(f.From) if conn != nil { toidx := f.To[11] % monitoring_center_count fs.moncen[toidx].regChan <- registerListener{ src: f.To, alias: f.ToAlias, l: &listener{ c: conn, me: f.From, }, } } } return nil } func (fs *friends) BroadcastMyState(ctx wshandler.ApiCallContext) { stateobj := ctx.Arguments[0].(string) meidx := ctx.CallBy.Accid[11] % monitoring_center_count bt, _ := json.Marshal(wshandler.DownstreamMessage{ Alias: ctx.CallBy.Alias, Body: "${state}", Tag: friend_state_tag, }) fs.moncen[meidx].publishState(ctx.CallBy.Accid, strings.Replace(string(bt), `"${state}"`, stateobj, 1)) } func (fs *friends) DeleteFriend(ctx wshandler.ApiCallContext) { fid, _ := primitive.ObjectIDFromHex(ctx.Arguments[0].(string)) var fdoc friendDoc if err := fs.mongoClient.FindOneAs(friends_collection_name, bson.M{ "_id": fid, }, &fdoc, options.FindOne().SetProjection(bson.M{ "from": 1, "to": 1, })); err != nil { logger.Println("DeleteFriend is failed :", err) return } if fdoc.Id.IsZero() { return } now := time.Now().UTC().Unix() fdoc.Deleted = true fdoc.Timestamp = now // 나한테 삭제 fs.mongoClient.Update(friends_collection_name, bson.M{ "_id": fid, }, bson.M{ "$set": bson.M{ "deleted": true, "ts": fdoc.Timestamp, }, }, options.Update().SetUpsert(false)) fs.conns.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{ Body: []friendDoc{fdoc}, Tag: friends_tag, }) // 상대방에게 삭제 var yourdoc friendDoc if err := fs.mongoClient.FindOneAndUpdateAs(friends_collection_name, bson.M{ "from": fdoc.To, "to": fdoc.From, }, bson.M{ "$set": bson.M{ "deleted": true, "ts": now, }, }, &yourdoc, options.FindOneAndUpdate().SetReturnDocument(options.After).SetUpsert(false)); err == nil { logger.Println("delete friend your doc :", yourdoc) fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{ Target: fdoc.To.Hex(), Body: []friendDoc{yourdoc}, Tag: friends_tag, }) } } func (fs *friends) StartMonitoringFriends(ctx wshandler.ApiCallContext) { // 내 친구 목록에 나를 등록 var friends []*friendDoc if err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{ "from": ctx.CallBy.Accid, }, &friends, options.Find().SetProjection(bson.M{"to": 1, "talias": 1})); err != nil { logger.Println("StartMonitoringFriends is failed :", err) return } me := &listener{ c: fs.conns.conn(ctx.CallBy.Accid), me: ctx.CallBy.Accid, } for _, f := range friends { toidx := f.To[11] % monitoring_center_count fs.moncen[toidx].regChan <- registerListener{ src: f.To, alias: f.ToAlias, l: me, } } fs.conns.initFriends(ctx.CallBy.Accid, friends) } func (fs *friends) stopMonitoringFriends(accid primitive.ObjectID) { friends := fs.conns.clearFriends(accid) if len(friends) > 0 { // 나를 상대방 모니터링에서 뺀다 nilListener := &listener{c: nil, me: accid} for _, f := range friends { toidx := f.To[11] % monitoring_center_count fs.moncen[toidx].regChan <- registerListener{ src: f.To, alias: f.ToAlias, l: nilListener, } } } } func (fs *friends) StopMonitoringFriends(ctx wshandler.ApiCallContext) { fs.stopMonitoringFriends(ctx.CallBy.Accid) } func (fs *friends) QueryFriends(ctx wshandler.ApiCallContext) { queryfrom := int64(ctx.Arguments[0].(float64)) var myfriends []friendDoc err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{ "from": ctx.CallBy.Accid, "ts": bson.M{"$gt": queryfrom}, }, &myfriends) if err != nil { logger.Println("QueryReceivedInvitations failed. FindAllAs err :", err) } if len(myfriends) > 0 { fs.conns.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{ Body: myfriends, Tag: friends_tag, }) } } func (fs *friends) Trim(ctx wshandler.ApiCallContext) { ids := stringsToObjs(ctx.Arguments[1].([]any)) if len(ids) > 0 { if len(ids) == 1 { fs.mongoClient.Delete(friends_collection_name, bson.M{"_id": ids[0]}) } else { fs.mongoClient.DeleteMany(friends_collection_name, bson.D{{Key: "_id", Value: bson.M{"$in": ids}}}) } } } func (fs *friends) Block(w http.ResponseWriter, r *http.Request) { // 친구를 삭제 var block blockDoc if err := gocommon.MakeDecoder(r).Decode(&block); err != nil { logger.Println("Block failed:", err) w.WriteHeader(http.StatusBadRequest) return } id := combineObjectID(block.From, block.To) now := time.Now().UTC().Unix() // 나한테 삭제 updated, _, err := fs.mongoClient.Update(friends_collection_name, bson.M{ "_id": id, }, bson.M{ "$set": bson.M{ "deleted": true, "ts": now, }, }, options.Update().SetUpsert(false)) if err != nil { logger.Println("Block failed:", err) w.WriteHeader(http.StatusInternalServerError) return } if !updated { // 친구가 아닌 모양. 그냥 넘어가면 끝 return } fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{ Target: block.From.Hex(), Body: []friendDoc{{Id: id, Deleted: true, Timestamp: now}}, Tag: friends_tag, }) // 상대방한테서 나를 제거 id = combineObjectID(block.To, block.From) fs.mongoClient.Update(friends_collection_name, bson.M{ "_id": id, }, bson.M{ "$set": bson.M{ "deleted": true, "ts": now, }, }, options.Update().SetUpsert(false)) fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{ Target: block.To.Hex(), Body: []friendDoc{{Id: id, Deleted: true, Timestamp: now}}, Tag: friends_tag, }) }