Files
social/core/friend.go

453 lines
12 KiB
Go

package core
import (
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
const (
monitoring_center_count = 100
state_online = "online"
state_offline = "offline"
)
type friendDoc struct {
Id primitive.ObjectID `bson:"_id,omitempty" json:"_id"`
From primitive.ObjectID `bson:"from" json:"-"`
To primitive.ObjectID `bson:"to" json:"-"`
ToAlias string `bson:"talias" json:"to"`
Timestamp int64 `bson:"ts" json:"ts"`
Deleted bool `bson:"deleted,omitempty" json:"deleted,omitempty"`
}
type registerListener struct {
src primitive.ObjectID
alias string
l *listener
}
type monitoringCenter struct {
regChan chan registerListener
publishState func(string, string, string)
}
type friends struct {
mongoClient gocommon.MongoClient
redison *gocommon.RedisonHandler
wsh *wshandler.WebsocketHandler
moncen []monitoringCenter
conns *connections
}
type listener struct {
c *websocket.Conn
me primitive.ObjectID
}
type listenerMap struct {
listeners map[primitive.ObjectID]*listener
connected bool
online []byte
offline []byte
}
func init() {
gob.Register([]friendDoc{})
}
// per channel
// src(alias) - listener(objectid) : socket
// - listener(objectid) : socket
// - listener(objectid) : socket
func combineObjectID(l primitive.ObjectID, r primitive.ObjectID) (out primitive.ObjectID) {
copy(out[0:2], l[2:4])
copy(out[2:6], l[8:12])
copy(out[6:8], r[2:4])
copy(out[8:12], r[8:12])
return
}
func makeSrcMap(src string, connected bool) *listenerMap {
online, _ := json.Marshal(wshandler.DownstreamMessage{
Body: bson.M{
"from": src,
"state": state_online,
},
Tag: friend_state_tag,
})
offline, _ := json.Marshal(wshandler.DownstreamMessage{
Body: bson.M{
"from": src,
"state": state_offline,
},
Tag: friend_state_tag,
})
return &listenerMap{
listeners: make(map[primitive.ObjectID]*listener),
connected: connected,
online: online,
offline: offline,
}
}
func makeFriends(ctx context.Context, so *Social, conns *connections) (*friends, error) {
if err := so.mongoClient.MakeUniqueIndices(friends_collection_name, map[string]bson.D{
"fromto": {{Key: "from", Value: 1}, {Key: "to", Value: 1}},
}); err != nil {
return nil, err
}
var moncen []monitoringCenter
for i := 0; i < monitoring_center_count; i++ {
subChannel := fmt.Sprintf("_soc_fr_monitor_ch_%d_%d", i, so.redison.Options().DB)
regChan := make(chan registerListener)
moncen = append(moncen, monitoringCenter{
regChan: regChan,
publishState: func(src, alias, state string) {
so.redison.Publish(ctx, subChannel, src+alias+":"+state).Result()
},
})
go func(subChannel string, regChan chan registerListener) {
pubsub := so.redison.Subscribe(ctx, subChannel)
listeners := make(map[primitive.ObjectID]*listenerMap)
for {
select {
case reg := <-regChan:
// 내가 관심있는 애들 등록
srcmap, online := listeners[reg.src]
if !online {
srcmap = makeSrcMap(reg.alias, false)
listeners[reg.src] = srcmap
}
if reg.l.c == nil {
// 등록 해제. 모니터링 종료
// listener목록에서 나(reg.l.me)를 제거
delete(srcmap.listeners, reg.l.me)
online = false
logger.Println("regChan unregistered :", reg.src.Hex(), reg.l.me.Hex())
} else if oldl, ok := srcmap.listeners[reg.l.me]; ok {
// 내가 이미 리스너로 등록되어 있다.
// 상대방이 나를 차단했을 경우에는 기존 리스너가 nil임
online = oldl != nil
logger.Println("regChan registered :", reg.src.Hex(), reg.l.me.Hex(), "old", online)
} else {
logger.Println("regChan registered :", reg.src.Hex(), reg.l.me.Hex())
srcmap.listeners[reg.l.me] = reg.l
}
if online && srcmap != nil {
logger.Println("regChan send online :", reg.l.me.Hex(), string(srcmap.online))
reg.l.c.WriteMessage(websocket.TextMessage, srcmap.online)
}
if len(srcmap.listeners) == 0 && !srcmap.connected {
delete(listeners, reg.src)
}
case msg := <-pubsub.Channel():
target, _ := primitive.ObjectIDFromHex(msg.Payload[:24])
aliasstate := strings.SplitN(msg.Payload[24:], ":", 2)
var sent []byte
if srcmap, ok := listeners[target]; ok {
if aliasstate[1] == state_online {
sent = srcmap.online
srcmap.connected = true
} else if aliasstate[1] == state_offline {
sent = srcmap.offline
srcmap.connected = false
if len(srcmap.listeners) == 0 {
delete(listeners, target)
}
}
if len(sent) > 0 {
for _, l := range srcmap.listeners {
logger.Println("state fire :", l.me, string(sent))
l.c.WriteMessage(websocket.TextMessage, sent)
}
}
} else if aliasstate[1] == state_online {
listeners[target] = makeSrcMap(aliasstate[0], true)
}
}
}
}(subChannel, regChan)
}
return &friends{
mongoClient: so.mongoClient,
redison: so.redison,
wsh: so.wsh,
moncen: moncen,
conns: conns,
}, nil
}
func (fs *friends) ClientConnected(conn *websocket.Conn, callby *wshandler.Sender) {
// 내 로그인 상태를 알림
meidx := callby.Accid[11] % monitoring_center_count
fs.moncen[meidx].publishState(callby.Accid.Hex(), callby.Alias, state_online)
}
func (fs *friends) ClientDisconnected(conn *websocket.Conn, callby *wshandler.Sender) {
// 로그 오프 상태를 알림
meidx := callby.Accid[11] % monitoring_center_count
fs.moncen[meidx].publishState(callby.Accid.Hex(), callby.Alias, state_offline)
fs.stopMonitoringFriends(callby.Accid)
}
var errAddFriendFailed = errors.New("addFriend failed")
func (fs *friends) addFriend(f *friendDoc) error {
_, newid, err := fs.mongoClient.Update(friends_collection_name, bson.M{
"_id": combineObjectID(f.From, f.To),
}, bson.M{
"$setOnInsert": f,
}, options.Update().SetUpsert(true))
if err != nil {
return err
}
if newid == nil {
return errAddFriendFailed
}
f.Id = newid.(primitive.ObjectID)
if fs.conns.addFriend(f.From, f) {
// 모니터링 중
conn := fs.conns.conn(f.From)
if conn != nil {
toidx := f.To[11] % monitoring_center_count
fs.moncen[toidx].regChan <- registerListener{
src: f.To,
alias: f.ToAlias,
l: &listener{
c: conn,
me: f.From,
},
}
}
}
return nil
}
func (fs *friends) DeleteFriend(ctx wshandler.ApiCallContext) {
fid, _ := primitive.ObjectIDFromHex(ctx.Arguments[0].(string))
var fdoc friendDoc
if err := fs.mongoClient.FindOneAs(friends_collection_name, bson.M{
"_id": fid,
}, &fdoc, options.FindOne().SetProjection(bson.M{
"from": 1,
"to": 1,
})); err != nil {
logger.Println("DeleteFriend is failed :", err)
return
}
if fdoc.Id.IsZero() {
return
}
now := time.Now().UTC().Unix()
fdoc.Deleted = true
fdoc.Timestamp = now
// 나한테 삭제
fs.mongoClient.Update(friends_collection_name, bson.M{
"_id": fid,
}, bson.M{
"$set": bson.M{
"deleted": true,
"ts": fdoc.Timestamp,
},
}, options.Update().SetUpsert(false))
fs.conns.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{
Body: []friendDoc{fdoc},
Tag: friends_tag,
})
// 상대방에게 삭제
var yourdoc friendDoc
if err := fs.mongoClient.FindOneAndUpdateAs(friends_collection_name, bson.M{
"from": fdoc.To,
"to": fdoc.From,
}, bson.M{
"$set": bson.M{
"deleted": true,
"ts": now,
},
}, &yourdoc, options.FindOneAndUpdate().SetReturnDocument(options.After).SetUpsert(false)); err == nil {
fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{
Target: fdoc.To.Hex(),
Body: []friendDoc{yourdoc},
Tag: friends_tag,
})
}
}
func (fs *friends) StartMonitoringFriends(ctx wshandler.ApiCallContext) {
// 내 친구 목록에 나를 등록
var friends []*friendDoc
if err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{
"from": ctx.CallBy.Accid,
}, &friends, options.Find().SetProjection(bson.M{"to": 1, "talias": 1})); err != nil {
logger.Println("StartMonitoringFriends is failed :", err)
return
}
me := &listener{
c: fs.conns.conn(ctx.CallBy.Accid),
me: ctx.CallBy.Accid,
}
for _, f := range friends {
toidx := f.To[11] % monitoring_center_count
fs.moncen[toidx].regChan <- registerListener{
src: f.To,
alias: f.ToAlias,
l: me,
}
}
fs.conns.initFriends(ctx.CallBy.Accid, friends)
}
func (fs *friends) stopMonitoringFriends(accid primitive.ObjectID) {
friends := fs.conns.clearFriends(accid)
if len(friends) > 0 {
// 나를 상대방 모니터링에서 뺀다
nilListener := &listener{c: nil, me: accid}
for _, f := range friends {
toidx := f.To[11] % monitoring_center_count
fs.moncen[toidx].regChan <- registerListener{
src: f.To,
alias: f.ToAlias,
l: nilListener,
}
}
}
}
func (fs *friends) StopMonitoringFriends(ctx wshandler.ApiCallContext) {
fs.stopMonitoringFriends(ctx.CallBy.Accid)
}
func (fs *friends) QueryFriends(ctx wshandler.ApiCallContext) {
queryfrom := int64(ctx.Arguments[0].(float64))
var myfriends []friendDoc
err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{
"from": ctx.CallBy.Accid,
"ts": bson.M{"$gt": queryfrom},
}, &myfriends)
if err != nil {
logger.Println("QueryReceivedInvitations failed. FindAllAs err :", err)
}
if len(myfriends) > 0 {
fs.conns.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{
Alias: ctx.CallBy.Alias,
Body: myfriends,
Tag: friends_tag,
})
}
}
func (fs *friends) Trim(ctx wshandler.ApiCallContext) {
stringsTobjs := func(in []any) (out []primitive.ObjectID) {
for _, i := range in {
p, _ := primitive.ObjectIDFromHex(i.(string))
out = append(out, p)
}
return
}
ids := stringsTobjs(ctx.Arguments[2].([]any))
if len(ids) > 0 {
if len(ids) == 1 {
fs.mongoClient.Delete(friends_collection_name, bson.M{"_id": ids[0]})
} else {
fs.mongoClient.DeleteMany(friends_collection_name, bson.D{{Key: "_id", Value: bson.M{"$in": ids}}})
}
}
}
func (fs *friends) Block(w http.ResponseWriter, r *http.Request) {
// 친구를 삭제
var block blockDoc
if err := gocommon.MakeDecoder(r).Decode(&block); err != nil {
logger.Println("Block failed:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
id := combineObjectID(block.From, block.To)
now := time.Now().UTC().Unix()
// 나한테 삭제
updated, _, err := fs.mongoClient.Update(friends_collection_name, bson.M{
"_id": id,
}, bson.M{
"$set": bson.M{
"deleted": true,
"ts": now,
},
}, options.Update().SetUpsert(false))
if err != nil {
logger.Println("Block failed:", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if !updated {
// 친구가 아닌 모양. 그냥 넘어가면 끝
return
}
fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{
Target: block.From.Hex(),
Body: []friendDoc{{Id: id, Deleted: true, Timestamp: now}},
Tag: friends_tag,
})
// 상대방한테서 나를 제거
id = combineObjectID(block.To, block.From)
fs.mongoClient.Update(friends_collection_name, bson.M{
"_id": id,
}, bson.M{
"$set": bson.M{
"deleted": true,
"ts": now,
},
}, options.Update().SetUpsert(false))
fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{
Target: block.To.Hex(),
Body: []friendDoc{{Id: id, Deleted: true, Timestamp: now}},
Tag: friends_tag,
})
}