package session import ( "context" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" ) type sessionMongo struct { *Authorization `bson:",inline"` Key string `bson:"key"` Ts primitive.DateTime `bson:"_ts"` } type consumer_mongo struct { consumer_common[*sessionMongo] ids map[primitive.ObjectID]string mongoClient gocommon.MongoClient ttl time.Duration } type sessionPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Session *sessionMongo `bson:"fullDocument"` } func NewConsumerWithMongo(ctx context.Context, mongoUrl string, dbname string, ttl time.Duration) (Consumer, error) { mc, err := gocommon.NewMongoClient(ctx, mongoUrl, dbname) if err != nil { return nil, err } consumer := &consumer_mongo{ consumer_common: consumer_common[*sessionMongo]{ ttl: ttl, ctx: ctx, stages: [2]*cache_stage[*sessionMongo]{make_cache_stage[*sessionMongo](), make_cache_stage[*sessionMongo]()}, startTime: time.Now(), }, ids: make(map[primitive.ObjectID]string), ttl: ttl, mongoClient: mc, } go func() { matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream nextswitch := time.Now().Add(ttl) for { if stream == nil { stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchAuthCollection watch failed :", err) time.Sleep(time.Minute) continue } } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data sessionPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) case "update": consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) case "delete": consumer.deleteById(data.DocumentKey.Id) } } else { logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchAuthCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchAuthCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } now := time.Now() for now.After(nextswitch) { consumer.changeStage() nextswitch = nextswitch.Add(ttl) } } }() return consumer, nil } func (c *consumer_mongo) query_internal(key string) (*sessionMongo, bool, error) { if _, deleted := c.stages[0].deleted[key]; deleted { return nil, false, nil } if _, deleted := c.stages[1].deleted[key]; deleted { return nil, false, nil } found, ok := c.stages[0].cache[key] if !ok { found, ok = c.stages[1].cache[key] } if ok { return found, false, nil } var si sessionMongo err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": key, }, &si) if err != nil { logger.Println("consumer Query :", err) return nil, false, err } if len(si.Key) > 0 { siptr := &si c.add_internal(key, siptr) return siptr, true, nil } return nil, false, nil } func (c *consumer_mongo) Query(key string) (*Authorization, error) { c.lock.Lock() defer c.lock.Unlock() si, _, err := c.query_internal(key) if err != nil { return nil, err } if si == nil { return nil, nil } if time.Now().After(si.Ts.Time().Add(c.ttl)) { return nil, nil } return si.Authorization, nil } func (c *consumer_mongo) Touch(key string) (*Authorization, error) { c.lock.Lock() defer c.lock.Unlock() worked, _, err := c.mongoClient.Update(session_collection_name, bson.M{ "key": key, }, bson.M{ "$currentDate": bson.M{ "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetUpsert(false)) if err != nil { logger.Println("consumer Touch :", err) return nil, err } if !worked { // 이미 만료되서 사라짐 return nil, nil } si, added, err := c.query_internal(key) if err != nil { return nil, err } if si == nil { return nil, nil } if !added { var doc struct { sessionMongo `bson:",inline"` Id primitive.ObjectID `bson:"_id"` } err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": key, }, &doc) if err != nil { logger.Println("consumer Query :", err) return nil, err } if len(si.Key) > 0 { c.add_internal(key, &doc.sessionMongo) c.ids[doc.Id] = key return doc.Authorization, nil } } return si.Authorization, nil } func (c *consumer_mongo) add(key string, id primitive.ObjectID, si *sessionMongo) { c.lock.Lock() defer c.lock.Unlock() c.consumer_common.add_internal(key, si) c.ids[id] = key } func (c *consumer_mongo) deleteById(id primitive.ObjectID) { c.lock.Lock() defer c.lock.Unlock() if key, ok := c.ids[id]; ok { c.consumer_common.delete_internal(key) delete(c.ids, id) } }