초기 커밋. 친구 초대 및 관리 완료
This commit is contained in:
491
core/friend.go
Normal file
491
core/friend.go
Normal file
@ -0,0 +1,491 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
"repositories.action2quare.com/ayo/gocommon/wshandler"
|
||||
)
|
||||
|
||||
const (
|
||||
friends_collection_name = gocommon.CollectionName("friends")
|
||||
monitoring_center_count = 100
|
||||
state_online = "online"
|
||||
state_offline = "offline"
|
||||
)
|
||||
|
||||
var friend_state_tag = []string{"social.FriendState"}
|
||||
|
||||
type friendDoc struct {
|
||||
Id primitive.ObjectID `bson:"_id,omitempty" json:"_id"`
|
||||
From primitive.ObjectID `bson:"from" json:"-"`
|
||||
To primitive.ObjectID `bson:"to" json:"-"`
|
||||
ToAlias string `bson:"talias" json:"to"`
|
||||
Timestamp int64 `bson:"ts" json:"ts"`
|
||||
Deleted bool `bson:"deleted,omitempty" json:"deleted,omitempty"`
|
||||
}
|
||||
|
||||
type registerListener struct {
|
||||
src primitive.ObjectID
|
||||
alias string
|
||||
l *listener
|
||||
}
|
||||
|
||||
type monitoringCenter struct {
|
||||
regChan chan registerListener
|
||||
publishState func(string, string, string)
|
||||
}
|
||||
|
||||
type connWithFriends struct {
|
||||
c *websocket.Conn
|
||||
friends []*friendDoc
|
||||
initialized bool
|
||||
}
|
||||
|
||||
type connections struct {
|
||||
connLock sync.Mutex
|
||||
conns map[primitive.ObjectID]*connWithFriends
|
||||
}
|
||||
|
||||
func (cs *connections) new(accid primitive.ObjectID, conn *websocket.Conn) {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
cs.conns[accid] = &connWithFriends{
|
||||
c: conn,
|
||||
initialized: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *connections) delete(accid primitive.ObjectID) {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
|
||||
delete(cs.conns, accid)
|
||||
}
|
||||
|
||||
func (cs *connections) conn(accid primitive.ObjectID) *websocket.Conn {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
if cf, ok := cs.conns[accid]; ok {
|
||||
return cf.c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *connections) addFriend(accid primitive.ObjectID, fdoc *friendDoc) bool {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
if cf, ok := cs.conns[accid]; ok {
|
||||
if cf.initialized {
|
||||
cf.friends = append(cf.friends, fdoc)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (cs *connections) initFriends(accid primitive.ObjectID, fdocs []*friendDoc) {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
if cf, ok := cs.conns[accid]; ok {
|
||||
cf.friends = fdocs
|
||||
cf.initialized = true
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *connections) clearFriends(accid primitive.ObjectID) (out []*friendDoc) {
|
||||
cs.connLock.Lock()
|
||||
defer cs.connLock.Unlock()
|
||||
if cf, ok := cs.conns[accid]; ok {
|
||||
out = cf.friends
|
||||
cf.friends = nil
|
||||
cf.initialized = false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type friends struct {
|
||||
mongoClient gocommon.MongoClient
|
||||
redison *gocommon.RedisonHandler
|
||||
wsh *wshandler.WebsocketHandler
|
||||
moncen []monitoringCenter
|
||||
conns connections
|
||||
}
|
||||
|
||||
type listener struct {
|
||||
c *websocket.Conn
|
||||
me primitive.ObjectID
|
||||
}
|
||||
|
||||
type listenerMap struct {
|
||||
listeners map[primitive.ObjectID]*listener
|
||||
connected bool
|
||||
online []byte
|
||||
offline []byte
|
||||
}
|
||||
|
||||
func init() {
|
||||
gob.Register([]friendDoc{})
|
||||
}
|
||||
|
||||
// per channel
|
||||
// src(alias) - listener(objectid) : socket
|
||||
// - listener(objectid) : socket
|
||||
// - listener(objectid) : socket
|
||||
|
||||
func makeSrcMap(src string, connected bool) *listenerMap {
|
||||
online, _ := json.Marshal(wshandler.DownstreamMessage{
|
||||
Body: bson.M{
|
||||
"from": src,
|
||||
"state": state_online,
|
||||
},
|
||||
Tag: friend_state_tag,
|
||||
})
|
||||
|
||||
offline, _ := json.Marshal(wshandler.DownstreamMessage{
|
||||
Body: bson.M{
|
||||
"from": src,
|
||||
"state": state_offline,
|
||||
},
|
||||
Tag: friend_state_tag,
|
||||
})
|
||||
|
||||
return &listenerMap{
|
||||
listeners: make(map[primitive.ObjectID]*listener),
|
||||
connected: connected,
|
||||
online: online,
|
||||
offline: offline,
|
||||
}
|
||||
}
|
||||
|
||||
func makeFriends(ctx context.Context, so *Social) (*friends, error) {
|
||||
if err := so.mongoClient.MakeUniqueIndices(friends_collection_name, map[string]bson.D{
|
||||
"fromto": {{Key: "from", Value: 1}, {Key: "to", Value: 1}},
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var moncen []monitoringCenter
|
||||
for i := 0; i < monitoring_center_count; i++ {
|
||||
subChannel := fmt.Sprintf("_soc_fr_monitor_ch_%d_%d", i, so.redison.Options().DB)
|
||||
regChan := make(chan registerListener)
|
||||
moncen = append(moncen, monitoringCenter{
|
||||
regChan: regChan,
|
||||
publishState: func(src, alias, state string) {
|
||||
so.redison.Publish(ctx, subChannel, src+alias+":"+state).Result()
|
||||
},
|
||||
})
|
||||
|
||||
go func(subChannel string, regChan chan registerListener) {
|
||||
pubsub := so.redison.Subscribe(ctx, subChannel)
|
||||
listeners := make(map[primitive.ObjectID]*listenerMap)
|
||||
for {
|
||||
select {
|
||||
case reg := <-regChan:
|
||||
// 내가 관심있는 애들 등록
|
||||
srcmap, online := listeners[reg.src]
|
||||
if !online {
|
||||
srcmap = makeSrcMap(reg.alias, false)
|
||||
listeners[reg.src] = srcmap
|
||||
}
|
||||
|
||||
if reg.l.c == nil {
|
||||
// 등록 해제. 모니터링 종료
|
||||
// listener목록에서 나(reg.l.me)를 제거
|
||||
delete(srcmap.listeners, reg.l.me)
|
||||
online = false
|
||||
logger.Println("regChan unregistered :", reg.src.Hex(), reg.l.me.Hex())
|
||||
} else if oldl, ok := srcmap.listeners[reg.l.me]; ok {
|
||||
// 내가 이미 리스너로 등록되어 있다.
|
||||
// 상대방이 나를 차단했을 경우에는 기존 리스너가 nil임
|
||||
online = oldl != nil
|
||||
logger.Println("regChan registered :", reg.src.Hex(), reg.l.me.Hex(), "old", online)
|
||||
} else {
|
||||
logger.Println("regChan registered :", reg.src.Hex(), reg.l.me.Hex())
|
||||
srcmap.listeners[reg.l.me] = reg.l
|
||||
}
|
||||
|
||||
if online && srcmap != nil {
|
||||
logger.Println("regChan send online :", reg.l.me.Hex(), string(srcmap.online))
|
||||
reg.l.c.WriteMessage(websocket.TextMessage, srcmap.online)
|
||||
}
|
||||
|
||||
if len(srcmap.listeners) == 0 && !srcmap.connected {
|
||||
delete(listeners, reg.src)
|
||||
}
|
||||
|
||||
case msg := <-pubsub.Channel():
|
||||
target, _ := primitive.ObjectIDFromHex(msg.Payload[:24])
|
||||
aliasstate := strings.SplitN(msg.Payload[24:], ":", 2)
|
||||
var sent []byte
|
||||
if srcmap, ok := listeners[target]; ok {
|
||||
if aliasstate[1] == state_online {
|
||||
sent = srcmap.online
|
||||
srcmap.connected = true
|
||||
} else if aliasstate[1] == state_offline {
|
||||
sent = srcmap.offline
|
||||
srcmap.connected = false
|
||||
if len(srcmap.listeners) == 0 {
|
||||
delete(listeners, target)
|
||||
}
|
||||
}
|
||||
|
||||
if len(sent) > 0 {
|
||||
for _, l := range srcmap.listeners {
|
||||
logger.Println("state fire :", l.me, string(sent))
|
||||
l.c.WriteMessage(websocket.TextMessage, sent)
|
||||
}
|
||||
}
|
||||
} else if aliasstate[1] == state_online {
|
||||
listeners[target] = makeSrcMap(aliasstate[0], true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(subChannel, regChan)
|
||||
}
|
||||
|
||||
return &friends{
|
||||
mongoClient: so.mongoClient,
|
||||
redison: so.redison,
|
||||
wsh: so.wsh,
|
||||
moncen: moncen,
|
||||
conns: connections{
|
||||
conns: make(map[primitive.ObjectID]*connWithFriends),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fs *friends) ClientConnected(conn *websocket.Conn, callby *wshandler.Sender) {
|
||||
fs.conns.new(callby.Accid, conn)
|
||||
|
||||
// 내 로그인 상태를 알림
|
||||
meidx := callby.Accid[11] % monitoring_center_count
|
||||
fs.moncen[meidx].publishState(callby.Accid.Hex(), callby.Alias, state_online)
|
||||
}
|
||||
|
||||
func (fs *friends) ClientDisconnected(conn *websocket.Conn, callby *wshandler.Sender) {
|
||||
// 로그 오프 상태를 알림
|
||||
meidx := callby.Accid[11] % monitoring_center_count
|
||||
fs.moncen[meidx].publishState(callby.Accid.Hex(), callby.Alias, state_offline)
|
||||
|
||||
fs.stopMonitoringFriends(callby.Accid)
|
||||
|
||||
fs.conns.delete(callby.Accid)
|
||||
}
|
||||
|
||||
func (fs *friends) writeMessage(acc primitive.ObjectID, src any) {
|
||||
c := fs.conns.conn(acc)
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if bt, err := json.Marshal(src); err == nil {
|
||||
c.WriteMessage(websocket.TextMessage, bt)
|
||||
}
|
||||
}
|
||||
|
||||
var errAddFriendFailed = errors.New("addFriend failed")
|
||||
|
||||
func (fs *friends) addFriend(f *friendDoc) error {
|
||||
_, newid, err := fs.mongoClient.Update(friends_collection_name, bson.M{
|
||||
"_id": primitive.NewObjectID(),
|
||||
}, bson.M{
|
||||
"$setOnInsert": f,
|
||||
}, options.Update().SetUpsert(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if newid == nil {
|
||||
return errAddFriendFailed
|
||||
}
|
||||
|
||||
f.Id = newid.(primitive.ObjectID)
|
||||
if fs.conns.addFriend(f.From, f) {
|
||||
// 모니터링 중
|
||||
conn := fs.conns.conn(f.From)
|
||||
if conn != nil {
|
||||
toidx := f.To[11] % monitoring_center_count
|
||||
fs.moncen[toidx].regChan <- registerListener{
|
||||
src: f.To,
|
||||
alias: f.ToAlias,
|
||||
l: &listener{
|
||||
c: conn,
|
||||
me: f.From,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *friends) Block(ctx wshandler.ApiCallContext) {
|
||||
// BlockByMe 에 추가하고 상대의 BlockByYou를 설정한다.
|
||||
|
||||
// var bi struct {
|
||||
// From primitive.ObjectID
|
||||
// To primitive.ObjectID
|
||||
// }
|
||||
// if err := gocommon.MakeDecoder(r).Decode(&bi); err != nil {
|
||||
// logger.Println("friends.Block failed :", err)
|
||||
// w.WriteHeader(http.StatusBadRequest)
|
||||
// return
|
||||
// }
|
||||
// logger.Println("friends.Block :", bi)
|
||||
}
|
||||
|
||||
func (fs *friends) DeleteFriend(ctx wshandler.ApiCallContext) {
|
||||
fid, _ := primitive.ObjectIDFromHex(ctx.Arguments[0].(string))
|
||||
|
||||
var fdoc friendDoc
|
||||
if err := fs.mongoClient.FindOneAs(friends_collection_name, bson.M{
|
||||
"_id": fid,
|
||||
}, &fdoc, options.FindOne().SetProjection(bson.M{
|
||||
"from": 1,
|
||||
"to": 1,
|
||||
})); err != nil {
|
||||
logger.Println("DeleteFriend is failed :", err)
|
||||
return
|
||||
}
|
||||
|
||||
if fdoc.Id.IsZero() {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().UTC().Unix()
|
||||
fdoc.Deleted = true
|
||||
fdoc.Timestamp = now
|
||||
|
||||
// 나한테 삭제
|
||||
fs.mongoClient.Update(friends_collection_name, bson.M{
|
||||
"_id": fid,
|
||||
}, bson.M{
|
||||
"$set": bson.M{
|
||||
"deleted": true,
|
||||
"ts": fdoc.Timestamp,
|
||||
},
|
||||
}, options.Update().SetUpsert(false))
|
||||
fs.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{
|
||||
Body: []friendDoc{fdoc},
|
||||
Tag: friends_tag,
|
||||
})
|
||||
|
||||
// 상대방에게 삭제
|
||||
var yourdoc friendDoc
|
||||
if err := fs.mongoClient.FindOneAndUpdateAs(friends_collection_name, bson.M{
|
||||
"from": fdoc.To,
|
||||
"to": fdoc.From,
|
||||
}, bson.M{
|
||||
"$set": bson.M{
|
||||
"deleted": true,
|
||||
"ts": now,
|
||||
},
|
||||
}, &yourdoc, options.FindOneAndUpdate().SetReturnDocument(options.After).SetUpsert(false)); err == nil {
|
||||
fs.wsh.SendUpstreamMessage(&wshandler.UpstreamMessage{
|
||||
Target: fdoc.To.Hex(),
|
||||
Body: []friendDoc{yourdoc},
|
||||
Tag: friends_tag,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *friends) StartMonitoringFriends(ctx wshandler.ApiCallContext) {
|
||||
// 내 친구 목록에 나를 등록
|
||||
var friends []*friendDoc
|
||||
if err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{
|
||||
"from": ctx.CallBy.Accid,
|
||||
}, &friends, options.Find().SetProjection(bson.M{"to": 1, "talias": 1})); err != nil {
|
||||
logger.Println("StartMonitoringFriends is failed :", err)
|
||||
return
|
||||
}
|
||||
|
||||
me := &listener{
|
||||
c: fs.conns.conn(ctx.CallBy.Accid),
|
||||
me: ctx.CallBy.Accid,
|
||||
}
|
||||
|
||||
for _, f := range friends {
|
||||
toidx := f.To[11] % monitoring_center_count
|
||||
fs.moncen[toidx].regChan <- registerListener{
|
||||
src: f.To,
|
||||
alias: f.ToAlias,
|
||||
l: me,
|
||||
}
|
||||
}
|
||||
|
||||
fs.conns.initFriends(ctx.CallBy.Accid, friends)
|
||||
}
|
||||
|
||||
func (fs *friends) stopMonitoringFriends(accid primitive.ObjectID) {
|
||||
friends := fs.conns.clearFriends(accid)
|
||||
|
||||
if len(friends) > 0 {
|
||||
// 나를 상대방 모니터링에서 뺀다
|
||||
nilListener := &listener{c: nil, me: accid}
|
||||
for _, f := range friends {
|
||||
toidx := f.To[11] % monitoring_center_count
|
||||
fs.moncen[toidx].regChan <- registerListener{
|
||||
src: f.To,
|
||||
alias: f.ToAlias,
|
||||
l: nilListener,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *friends) StopMonitoringFriends(ctx wshandler.ApiCallContext) {
|
||||
fs.stopMonitoringFriends(ctx.CallBy.Accid)
|
||||
}
|
||||
|
||||
func (fs *friends) QueryFriends(ctx wshandler.ApiCallContext) {
|
||||
queryfrom := int64(ctx.Arguments[0].(float64))
|
||||
|
||||
var myfriends []friendDoc
|
||||
err := fs.mongoClient.FindAllAs(friends_collection_name, bson.M{
|
||||
"from": ctx.CallBy.Accid,
|
||||
"ts": bson.M{"$gt": queryfrom},
|
||||
}, &myfriends)
|
||||
if err != nil {
|
||||
logger.Println("QueryReceivedInvitations failed. FindAllAs err :", err)
|
||||
}
|
||||
|
||||
if len(myfriends) > 0 {
|
||||
fs.writeMessage(ctx.CallBy.Accid, &wshandler.DownstreamMessage{
|
||||
Alias: ctx.CallBy.Alias,
|
||||
Body: myfriends,
|
||||
Tag: friends_tag,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *friends) Trim(ctx wshandler.ApiCallContext) {
|
||||
stringsTobjs := func(in []any) (out []primitive.ObjectID) {
|
||||
for _, i := range in {
|
||||
p, _ := primitive.ObjectIDFromHex(i.(string))
|
||||
out = append(out, p)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ids := stringsTobjs(ctx.Arguments[2].([]any))
|
||||
if len(ids) > 0 {
|
||||
if len(ids) == 1 {
|
||||
fs.mongoClient.Delete(friends_collection_name, bson.M{"_id": ids[0]})
|
||||
} else {
|
||||
fs.mongoClient.DeleteMany(friends_collection_name, bson.D{{Key: "_id", Value: bson.M{"$in": ids}}})
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user