2023-08-22 10:16:09 +09:00
|
|
|
package core
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
|
|
"repositories.action2quare.com/ayo/gocommon"
|
|
|
|
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type memberContraints[K comparable] interface {
|
|
|
|
|
Key() K
|
|
|
|
|
Expired() bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type memberContainerPtr[K comparable, T memberContraints[K]] struct {
|
|
|
|
|
ptr unsafe.Pointer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *memberContainerPtr[K, T]) init(ms []T) {
|
|
|
|
|
next := map[K]T{}
|
|
|
|
|
for _, m := range ms {
|
|
|
|
|
next[m.Key()] = m
|
|
|
|
|
}
|
|
|
|
|
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *memberContainerPtr[K, T]) add(m T) {
|
|
|
|
|
ptr := atomic.LoadPointer(&p.ptr)
|
|
|
|
|
src := (*map[K]T)(ptr)
|
|
|
|
|
|
|
|
|
|
next := map[K]T{}
|
|
|
|
|
for k, v := range *src {
|
|
|
|
|
next[k] = v
|
|
|
|
|
}
|
|
|
|
|
next[m.Key()] = m
|
|
|
|
|
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
|
|
|
|
}
|
|
|
|
|
|
2023-09-25 12:29:26 +09:00
|
|
|
func (p *memberContainerPtr[K, T]) get(key K) (T, bool) {
|
|
|
|
|
ptr := atomic.LoadPointer(&p.ptr)
|
|
|
|
|
src := (*map[K]T)(ptr)
|
|
|
|
|
|
|
|
|
|
out, found := (*src)[key]
|
|
|
|
|
return out, found
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-22 10:16:09 +09:00
|
|
|
func (p *memberContainerPtr[K, T]) remove(key K) {
|
|
|
|
|
ptr := atomic.LoadPointer(&p.ptr)
|
|
|
|
|
src := (*map[K]T)(ptr)
|
|
|
|
|
|
|
|
|
|
next := map[K]T{}
|
|
|
|
|
for k, v := range *src {
|
|
|
|
|
next[k] = v
|
|
|
|
|
}
|
|
|
|
|
delete(next, key)
|
|
|
|
|
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type memberPipelineDocument[K comparable, T memberContraints[K]] struct {
|
|
|
|
|
OperationType string `bson:"operationType"`
|
|
|
|
|
DocumentKey struct {
|
|
|
|
|
Id primitive.ObjectID `bson:"_id"`
|
|
|
|
|
} `bson:"documentKey"`
|
|
|
|
|
Member T `bson:"fullDocument"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *memberContainerPtr[K, T]) all() []T {
|
|
|
|
|
ptr := atomic.LoadPointer(&p.ptr)
|
|
|
|
|
src := (*map[K]T)(ptr)
|
|
|
|
|
|
|
|
|
|
out := make([]T, 0, len(*src))
|
|
|
|
|
for _, m := range *src {
|
|
|
|
|
if m.Expired() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
out = append(out, m)
|
|
|
|
|
}
|
|
|
|
|
return out
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *memberContainerPtr[K, T]) watchCollection(parentctx context.Context, coll gocommon.CollectionName, mc gocommon.MongoClient) {
|
|
|
|
|
defer func() {
|
|
|
|
|
s := recover()
|
|
|
|
|
if s != nil {
|
|
|
|
|
logger.Error(s)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
matchStage := bson.D{
|
|
|
|
|
{
|
|
|
|
|
Key: "$match", Value: bson.D{
|
|
|
|
|
{Key: "operationType", Value: bson.D{
|
|
|
|
|
{Key: "$in", Value: bson.A{
|
|
|
|
|
"update",
|
|
|
|
|
"insert",
|
|
|
|
|
}},
|
|
|
|
|
}},
|
|
|
|
|
},
|
|
|
|
|
}}
|
|
|
|
|
projectStage := bson.D{
|
|
|
|
|
{
|
|
|
|
|
Key: "$project", Value: bson.D{
|
|
|
|
|
{Key: "documentKey", Value: 1},
|
|
|
|
|
{Key: "fullDocument", Value: 1},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var stream *mongo.ChangeStream
|
|
|
|
|
var err error
|
|
|
|
|
var ctx context.Context
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
if stream == nil {
|
|
|
|
|
stream, err = mc.Watch(coll, mongo.Pipeline{matchStage, projectStage})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("watchCollection watch failed :", err)
|
|
|
|
|
time.Sleep(time.Minute)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
ctx = context.TODO()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
changed := stream.TryNext(ctx)
|
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
logger.Error("watchCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if changed {
|
|
|
|
|
var data memberPipelineDocument[K, T]
|
|
|
|
|
if err := stream.Decode(&data); err == nil {
|
|
|
|
|
p.add(data.Member)
|
|
|
|
|
} else {
|
|
|
|
|
logger.Error("watchCollection stream.Decode failed :", err)
|
|
|
|
|
}
|
|
|
|
|
} else if stream.Err() != nil || stream.ID() == 0 {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
logger.Println("watchCollection is done")
|
|
|
|
|
stream.Close(ctx)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
|
logger.Error("watchCollection stream error :", stream.Err())
|
|
|
|
|
stream.Close(ctx)
|
|
|
|
|
stream = nil
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|