Files
maingate/core/watch.go

309 lines
7.1 KiB
Go
Raw Permalink Normal View History

2023-05-24 12:16:03 +09:00
package core
import (
"context"
"encoding/hex"
2023-05-24 12:16:03 +09:00
"net/http"
"os"
"path"
2023-05-24 12:16:03 +09:00
"sync/atomic"
"time"
2023-06-20 11:07:53 +09:00
"unsafe"
2023-05-24 12:16:03 +09:00
"repositories.action2quare.com/ayo/gocommon"
2023-05-24 15:31:01 +09:00
"repositories.action2quare.com/ayo/gocommon/logger"
2023-05-24 12:16:03 +09:00
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type authPipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Authinfo *gocommon.Authinfo `bson:"fullDocument"`
2023-05-24 12:16:03 +09:00
}
type servicePipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Service *serviceDescription `bson:"fullDocument"`
}
type filePipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
2023-06-27 19:03:53 +09:00
File *FileDocumentDesc `bson:"fullDocument"`
}
func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "operationType", Value: 1},
{Key: "documentKey", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
if err != nil {
logger.Error("watchFileCollection watch failed :", err)
time.Sleep(time.Second)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if !changed {
if stream.Err() != nil || stream.ID() == 0 {
2023-06-13 20:10:08 +09:00
select {
case <-ctx.Done():
logger.Println("watchServiceCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
continue
}
var data filePipelineDocument
if err := stream.Decode(&data); err == nil {
switch data.OperationType {
case "insert":
2023-06-27 19:13:10 +09:00
data.File.Save()
case "delete":
subfolder := hex.EncodeToString(data.DocumentKey.Id[:4])
rf := hex.EncodeToString(data.DocumentKey.Id[4:8])
subpath := path.Join("static", subfolder, rf)
os.RemoveAll(subpath)
}
}
}
}
2023-05-24 12:16:03 +09:00
func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"insert",
"update",
"replace",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
if err != nil {
logger.Error("watchServiceCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data servicePipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
// 새 서비스 추가됨
if err := data.Service.prepare(mg); err != nil {
logger.Error("service cannot be prepared :", data.Service, err)
} else {
// 내가 임시로 가지고 있던 서비스일 수 있다.
2023-08-16 21:44:19 +09:00
if mg.service().Id == data.Service.Id {
logger.Println("service is on the board! :", data.Service)
atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service))
}
2023-05-24 12:16:03 +09:00
}
case "replace":
fallthrough
case "update":
data.Service.prepare(mg)
2023-06-28 15:17:11 +09:00
atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service))
2023-05-24 12:16:03 +09:00
}
} else {
logger.Error("watchServiceCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
2023-06-13 20:10:08 +09:00
select {
case <-ctx.Done():
logger.Println("watchServiceCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
2023-05-24 12:16:03 +09:00
} else {
time.Sleep(time.Second)
}
}
}
func watchAuthCollection(parentctx context.Context, ac *gocommon.AuthCollection, mongoClient gocommon.MongoClient) {
2023-05-24 12:16:03 +09:00
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
"update",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "documentKey", Value: 1},
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage})
if err != nil {
logger.Error("watchAuthCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data authPipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
case "update":
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
case "delete":
ac.RemoveByAccId(data.DocumentKey.Id)
}
} else {
logger.Error("watchAuthCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
2023-06-13 20:10:08 +09:00
select {
case <-ctx.Done():
logger.Println("watchAuthCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
2023-05-24 12:16:03 +09:00
} else {
time.Sleep(time.Second)
}
}
}