407 lines
9.5 KiB
Go
407 lines
9.5 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
common "repositories.action2quare.com/ayo/gocommon"
|
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
type authPipelineDocument struct {
|
|
OperationType string `bson:"operationType"`
|
|
DocumentKey struct {
|
|
Id primitive.ObjectID `bson:"_id"`
|
|
} `bson:"documentKey"`
|
|
Authinfo *common.Authinfo `bson:"fullDocument"`
|
|
}
|
|
|
|
type servicePipelineDocument struct {
|
|
OperationType string `bson:"operationType"`
|
|
DocumentKey struct {
|
|
Id primitive.ObjectID `bson:"_id"`
|
|
} `bson:"documentKey"`
|
|
Service *serviceDescription `bson:"fullDocument"`
|
|
}
|
|
|
|
type filePipelineDocument struct {
|
|
OperationType string `bson:"operationType"`
|
|
DocumentKey struct {
|
|
Id primitive.ObjectID `bson:"_id"`
|
|
} `bson:"documentKey"`
|
|
File *FileDocumentDesc `bson:"fullDocument"`
|
|
}
|
|
|
|
type whilelistPipelineDocument struct {
|
|
OperationType string `bson:"operationType"`
|
|
DocumentKey struct {
|
|
Id primitive.ObjectID `bson:"_id"`
|
|
} `bson:"documentKey"`
|
|
Member *whitelistmember `bson:"fullDocument"`
|
|
}
|
|
|
|
func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
|
|
defer func() {
|
|
s := recover()
|
|
if s != nil {
|
|
logger.Error(s)
|
|
}
|
|
}()
|
|
|
|
matchStage := bson.D{
|
|
{
|
|
Key: "$match", Value: bson.D{
|
|
{Key: "operationType", Value: bson.D{
|
|
{Key: "$in", Value: bson.A{
|
|
"update",
|
|
"insert",
|
|
}},
|
|
}},
|
|
},
|
|
}}
|
|
projectStage := bson.D{
|
|
{
|
|
Key: "$project", Value: bson.D{
|
|
{Key: "documentKey", Value: 1},
|
|
{Key: "operationType", Value: 1},
|
|
{Key: "fullDocument", Value: 1},
|
|
},
|
|
},
|
|
}
|
|
|
|
var stream *mongo.ChangeStream
|
|
var err error
|
|
var ctx context.Context
|
|
|
|
for {
|
|
if stream == nil {
|
|
stream, err = mg.mongoClient.Watch(CollectionWhitelist, mongo.Pipeline{matchStage, projectStage})
|
|
if err != nil {
|
|
logger.Error("watchWhitelistCollection watch failed :", err)
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
ctx = context.TODO()
|
|
}
|
|
|
|
changed := stream.TryNext(ctx)
|
|
if ctx.Err() != nil {
|
|
logger.Error("watchWhitelistCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
|
break
|
|
}
|
|
|
|
if changed {
|
|
var data whilelistPipelineDocument
|
|
if err := stream.Decode(&data); err == nil {
|
|
ot := data.OperationType
|
|
switch ot {
|
|
case "insert":
|
|
// 새 화이트리스트 멤버
|
|
mg.service().wl.add(data.Member)
|
|
case "update":
|
|
if data.Member.Expired != 0 {
|
|
logger.Println("whitelist member is removed :", *data.Member)
|
|
mg.service().wl.remove(data.Member.Email)
|
|
} else {
|
|
logger.Println("whitelist member is updated :", *data.Member)
|
|
mg.service().wl.add(data.Member)
|
|
}
|
|
}
|
|
} else {
|
|
logger.Error("watchWhitelistCollection stream.Decode failed :", err)
|
|
}
|
|
} else if stream.Err() != nil || stream.ID() == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Println("watchWhitelistCollection is done")
|
|
stream.Close(ctx)
|
|
return
|
|
|
|
case <-time.After(time.Second):
|
|
logger.Error("watchWhitelistCollection stream error :", stream.Err())
|
|
stream.Close(ctx)
|
|
stream = nil
|
|
}
|
|
} else {
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
|
|
defer func() {
|
|
s := recover()
|
|
if s != nil {
|
|
logger.Error(s)
|
|
}
|
|
}()
|
|
|
|
matchStage := bson.D{
|
|
{
|
|
Key: "$match", Value: bson.D{
|
|
{Key: "operationType", Value: bson.D{
|
|
{Key: "$in", Value: bson.A{
|
|
"delete",
|
|
"insert",
|
|
}},
|
|
}},
|
|
},
|
|
}}
|
|
projectStage := bson.D{
|
|
{
|
|
Key: "$project", Value: bson.D{
|
|
{Key: "operationType", Value: 1},
|
|
{Key: "documentKey", Value: 1},
|
|
{Key: "fullDocument", Value: 1},
|
|
},
|
|
},
|
|
}
|
|
var stream *mongo.ChangeStream
|
|
var err error
|
|
var ctx context.Context
|
|
|
|
for {
|
|
if stream == nil {
|
|
stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
|
|
if err != nil {
|
|
logger.Error("watchFileCollection watch failed :", err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
ctx = context.TODO()
|
|
}
|
|
|
|
changed := stream.TryNext(ctx)
|
|
if ctx.Err() != nil {
|
|
logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
|
break
|
|
}
|
|
|
|
if !changed {
|
|
if stream.Err() != nil || stream.ID() == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Println("watchServiceCollection is done")
|
|
stream.Close(ctx)
|
|
return
|
|
|
|
case <-time.After(time.Second):
|
|
logger.Error("watchServiceCollection stream error :", stream.Err())
|
|
stream.Close(ctx)
|
|
stream = nil
|
|
}
|
|
} else {
|
|
time.Sleep(time.Second)
|
|
}
|
|
continue
|
|
}
|
|
|
|
var data filePipelineDocument
|
|
if err := stream.Decode(&data); err == nil {
|
|
switch data.OperationType {
|
|
case "insert":
|
|
data.File.Save()
|
|
|
|
case "delete":
|
|
subfolder := hex.EncodeToString(data.DocumentKey.Id[:4])
|
|
rf := hex.EncodeToString(data.DocumentKey.Id[4:8])
|
|
|
|
subpath := path.Join("static", subfolder, rf)
|
|
os.RemoveAll(subpath)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
|
|
defer func() {
|
|
s := recover()
|
|
if s != nil {
|
|
logger.Error(s)
|
|
}
|
|
}()
|
|
|
|
matchStage := bson.D{
|
|
{
|
|
Key: "$match", Value: bson.D{
|
|
{Key: "operationType", Value: bson.D{
|
|
{Key: "$in", Value: bson.A{
|
|
"insert",
|
|
"update",
|
|
"replace",
|
|
}},
|
|
}},
|
|
},
|
|
}}
|
|
projectStage := bson.D{
|
|
{
|
|
Key: "$project", Value: bson.D{
|
|
{Key: "operationType", Value: 1},
|
|
{Key: "fullDocument", Value: 1},
|
|
},
|
|
},
|
|
}
|
|
|
|
var stream *mongo.ChangeStream
|
|
var err error
|
|
var ctx context.Context
|
|
|
|
for {
|
|
if stream == nil {
|
|
stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
|
|
if err != nil {
|
|
logger.Error("watchServiceCollection watch failed :", err)
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
ctx = context.TODO()
|
|
}
|
|
|
|
changed := stream.TryNext(ctx)
|
|
if ctx.Err() != nil {
|
|
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
|
break
|
|
}
|
|
|
|
if changed {
|
|
var data servicePipelineDocument
|
|
if err := stream.Decode(&data); err == nil {
|
|
ot := data.OperationType
|
|
switch ot {
|
|
case "insert":
|
|
// 새 서비스 추가됨
|
|
if err := data.Service.prepare(mg); err != nil {
|
|
logger.Error("service cannot be prepared :", data.Service, err)
|
|
} else {
|
|
// 내가 임시로 가지고 있던 서비스일 수 있다.
|
|
already := mg.service().Id == data.Service.Id
|
|
logger.Println("service is on the board! :", data.Service)
|
|
atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service))
|
|
if !already {
|
|
serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), mg.service())
|
|
}
|
|
}
|
|
|
|
case "replace":
|
|
fallthrough
|
|
|
|
case "update":
|
|
data.Service.prepare(mg)
|
|
atomic.StorePointer(&mg.serviceptr, unsafe.Pointer(data.Service))
|
|
}
|
|
} else {
|
|
logger.Error("watchServiceCollection stream.Decode failed :", err)
|
|
}
|
|
} else if stream.Err() != nil || stream.ID() == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Println("watchServiceCollection is done")
|
|
stream.Close(ctx)
|
|
return
|
|
|
|
case <-time.After(time.Second):
|
|
logger.Error("watchServiceCollection stream error :", stream.Err())
|
|
stream.Close(ctx)
|
|
stream = nil
|
|
}
|
|
} else {
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, mongoClient common.MongoClient) {
|
|
defer func() {
|
|
s := recover()
|
|
if s != nil {
|
|
logger.Error(s)
|
|
}
|
|
}()
|
|
|
|
matchStage := bson.D{
|
|
{
|
|
Key: "$match", Value: bson.D{
|
|
{Key: "operationType", Value: bson.D{
|
|
{Key: "$in", Value: bson.A{
|
|
"delete",
|
|
"insert",
|
|
"update",
|
|
}},
|
|
}},
|
|
},
|
|
}}
|
|
projectStage := bson.D{
|
|
{
|
|
Key: "$project", Value: bson.D{
|
|
{Key: "documentKey", Value: 1},
|
|
{Key: "operationType", Value: 1},
|
|
{Key: "fullDocument", Value: 1},
|
|
},
|
|
},
|
|
}
|
|
|
|
var stream *mongo.ChangeStream
|
|
var err error
|
|
var ctx context.Context
|
|
|
|
for {
|
|
if stream == nil {
|
|
stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage})
|
|
if err != nil {
|
|
logger.Error("watchAuthCollection watch failed :", err)
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
ctx = context.TODO()
|
|
}
|
|
|
|
changed := stream.TryNext(ctx)
|
|
if ctx.Err() != nil {
|
|
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
|
break
|
|
}
|
|
|
|
if changed {
|
|
var data authPipelineDocument
|
|
if err := stream.Decode(&data); err == nil {
|
|
ot := data.OperationType
|
|
switch ot {
|
|
case "insert":
|
|
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
|
case "update":
|
|
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
|
case "delete":
|
|
ac.RemoveByAccId(data.DocumentKey.Id)
|
|
}
|
|
} else {
|
|
logger.Error("watchAuthCollection stream.Decode failed :", err)
|
|
}
|
|
} else if stream.Err() != nil || stream.ID() == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Println("watchAuthCollection is done")
|
|
stream.Close(ctx)
|
|
return
|
|
|
|
case <-time.After(time.Second):
|
|
logger.Error("watchAuthCollection stream error :", stream.Err())
|
|
stream.Close(ctx)
|
|
stream = nil
|
|
}
|
|
} else {
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|