Files
maingate/core/watch.go

417 lines
10 KiB
Go

package core
import (
"context"
"encoding/hex"
"net/http"
"os"
"path"
"sync/atomic"
"time"
common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type authPipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Authinfo *common.Authinfo `bson:"fullDocument"`
}
type servicePipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Service *serviceDescription `bson:"fullDocument"`
}
type filePipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
File *fileDocumentDesc `bson:"fullDocument"`
}
type whilelistPipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Member *whitelistmember `bson:"fullDocument"`
}
func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"update",
"insert",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "documentKey", Value: 1},
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mg.mongoClient.Watch(CollectionWhitelist, mongo.Pipeline{matchStage, projectStage})
if err != nil {
logger.Error("watchWhitelistCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data whilelistPipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
// 새 화이트리스트 멤버
if svc := mg.services.get(data.Member.Service); svc != nil {
svc.wl.add(data.Member)
}
case "update":
if svc := mg.services.get(data.Member.Service); svc != nil {
if data.Member.Expired != 0 {
logger.Println("whitelist member is removed :", *data.Member)
svc.wl.remove(data.Member.Email)
} else {
logger.Println("whitelist member is updated :", *data.Member)
svc.wl.add(data.Member)
}
}
}
} else {
logger.Error("watchServiceCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
} else {
time.Sleep(time.Second)
}
}
}
func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "operationType", Value: 1},
{Key: "documentKey", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
if err != nil {
logger.Error("watchFileCollection watch failed :", err)
time.Sleep(time.Second)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if !changed {
if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
} else {
time.Sleep(time.Second)
}
continue
}
var data filePipelineDocument
if err := stream.Decode(&data); err == nil {
switch data.OperationType {
case "insert":
data.File.save()
case "delete":
subfolder := hex.EncodeToString(data.DocumentKey.Id[:4])
rf := hex.EncodeToString(data.DocumentKey.Id[4:8])
subpath := path.Join("static", subfolder, rf)
os.RemoveAll(subpath)
}
}
}
}
func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
"update",
"replace",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
if err != nil {
logger.Error("watchServiceCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data servicePipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
// 새 서비스 추가됨
if err := data.Service.prepare(mg); err != nil {
logger.Error("service cannot be prepared :", data.Service, err)
} else {
logger.Println("service is on the board! :", data.Service)
mg.services.add(data.Service)
serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service)
}
case "replace":
fallthrough
case "update":
data.Service.prepare(mg)
if old := mg.services.get(data.Service.ServiceName); old != nil {
atomic.SwapPointer(&old.divisionsSerialized, data.Service.divisionsSerialized)
atomic.SwapPointer(&old.apiUsers, data.Service.apiUsers)
atomic.SwapPointer(&old.serviceSerialized, data.Service.serviceSerialized)
atomic.SwapPointer(&old.serviceSummarySerialized, data.Service.serviceSummarySerialized)
for _, token := range old.ServerApiTokens {
mg.apiTokenToService.remove(token.Hex())
}
for _, token := range data.Service.ServerApiTokens {
mg.apiTokenToService.add(token.Hex(), data.Service.ServiceCode)
}
if data.Service.UseWhitelist {
atomic.StoreInt32(&old.wl.working, 1)
} else {
atomic.StoreInt32(&old.wl.working, 0)
}
old.Closed = data.Service.Closed
if old.Closed {
atomic.StoreInt32(&old.closed, 1)
} else {
atomic.StoreInt32(&old.closed, 0)
}
atomic.SwapPointer(&old.wl.emailptr, data.Service.wl.emailptr)
old.Divisions = data.Service.Divisions
} else if !data.Service.Closed {
if err := data.Service.prepare(mg); err != nil {
logger.Error("service cannot be prepared :", data.Service, err)
} else {
logger.Println("service is on the board! :", data.Service)
mg.services.add(data.Service)
serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service)
}
}
case "delete":
if deleted := mg.services.remove(data.DocumentKey.Id); deleted != nil {
logger.Println("service is closed :", data.Service)
atomic.AddInt32(&deleted.closed, 1)
}
}
} else {
logger.Error("watchServiceCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
} else {
time.Sleep(time.Second)
}
}
}
func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, mongoClient common.MongoClient) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
}()
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
"update",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "documentKey", Value: 1},
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
var err error
var ctx context.Context
for {
if stream == nil {
stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage})
if err != nil {
logger.Error("watchAuthCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
ctx = context.TODO()
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data authPipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
case "update":
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
case "delete":
ac.RemoveByAccId(data.DocumentKey.Id)
}
} else {
logger.Error("watchAuthCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
} else {
time.Sleep(time.Second)
}
}
}