package session import ( "context" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" ) const ( session_collection_name = gocommon.CollectionName("session") ) type provider_mongo struct { mongoClient gocommon.MongoClient } type sessionMongo struct { Id primitive.ObjectID `bson:"_id,omitempty"` Auth *Authorization `bson:"auth"` Key storagekey `bson:"key"` Ts primitive.DateTime `bson:"_ts"` } func newProviderWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Provider, error) { mc, err := gocommon.NewMongoClient(ctx, mongoUrl) if err != nil { return nil, err } if err = mc.MakeUniqueIndices(session_collection_name, map[string]bson.D{ "key": {{Key: "key", Value: 1}}, }); err != nil { return nil, err } if err := mc.MakeExpireIndex(session_collection_name, int32(ttl.Seconds())); err != nil { return nil, err } return &provider_mongo{ mongoClient: mc, }, nil } func (p *provider_mongo) New(input *Authorization) (string, error) { sk := make_storagekey(input.Account) _, _, err := p.mongoClient.Update(session_collection_name, bson.M{ "_id": input.Account, }, bson.M{ "$set": sessionMongo{ Auth: input, Key: sk, Ts: primitive.NewDateTimeFromTime(time.Now().UTC()), }, }, options.Update().SetUpsert(true)) return string(storagekey_to_publickey(sk)), err } func (p *provider_mongo) Delete(acc primitive.ObjectID) error { _, err := p.mongoClient.Delete(session_collection_name, bson.M{ "_id": acc, }) return err } func (p *provider_mongo) Query(pk string) (Authorization, error) { sk := publickey_to_storagekey(publickey(pk)) var auth Authorization err := p.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": sk, }, &auth) return auth, err } func (p *provider_mongo) Touch(pk string) (bool, error) { sk := publickey_to_storagekey(publickey(pk)) worked, _, err := p.mongoClient.Update(session_collection_name, bson.M{ "key": sk, }, bson.M{ "$currentDate": bson.M{ "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetUpsert(false)) if err != nil { logger.Println("provider Touch :", err) return false, err } return worked, nil } type consumer_mongo struct { consumer_common[*sessionMongo] ids map[primitive.ObjectID]storagekey mongoClient gocommon.MongoClient ttl time.Duration } type sessionPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Session *sessionMongo `bson:"fullDocument"` } func newConsumerWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Consumer, error) { mc, err := gocommon.NewMongoClient(ctx, mongoUrl) if err != nil { return nil, err } consumer := &consumer_mongo{ consumer_common: consumer_common[*sessionMongo]{ ttl: ttl, ctx: ctx, stages: [2]*cache_stage[*sessionMongo]{make_cache_stage[*sessionMongo](), make_cache_stage[*sessionMongo]()}, startTime: time.Now(), }, ids: make(map[primitive.ObjectID]storagekey), ttl: ttl, mongoClient: mc, } go func() { matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream nextswitch := time.Now().Add(ttl) for { if stream == nil { stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchAuthCollection watch failed :", err) time.Sleep(time.Minute) continue } } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data sessionPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) case "update": if data.Session == nil { consumer.deleteById(data.DocumentKey.Id) } else { consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) } case "delete": consumer.deleteById(data.DocumentKey.Id) } } else { logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchAuthCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchAuthCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } now := time.Now() for now.After(nextswitch) { consumer.changeStage() nextswitch = nextswitch.Add(ttl) } } }() return consumer, nil } func (c *consumer_mongo) query_internal(sk storagekey) (*sessionMongo, bool, error) { if _, deleted := c.stages[0].deleted[sk]; deleted { return nil, false, nil } if _, deleted := c.stages[1].deleted[sk]; deleted { return nil, false, nil } found, ok := c.stages[0].cache[sk] if !ok { found, ok = c.stages[1].cache[sk] } if ok { return found, false, nil } var si sessionMongo err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": sk, }, &si) if err != nil { logger.Println("consumer Query :", err) return nil, false, err } if len(si.Key) > 0 { siptr := &si c.add_internal(sk, siptr) return siptr, true, nil } return nil, false, nil } func (c *consumer_mongo) Query(pk string) (Authorization, error) { c.lock.Lock() defer c.lock.Unlock() sk := publickey_to_storagekey(publickey(pk)) si, _, err := c.query_internal(sk) if err != nil { return Authorization{}, err } if si == nil { return Authorization{}, nil } if time.Now().After(si.Ts.Time().Add(c.ttl)) { return Authorization{}, nil } return *si.Auth, nil } func (c *consumer_mongo) Touch(pk string) (Authorization, error) { c.lock.Lock() defer c.lock.Unlock() sk := publickey_to_storagekey(publickey(pk)) worked, _, err := c.mongoClient.Update(session_collection_name, bson.M{ "key": sk, }, bson.M{ "$currentDate": bson.M{ "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetUpsert(false)) if err != nil { logger.Println("consumer Touch :", err) return Authorization{}, err } if !worked { // 이미 만료되서 사라짐 return Authorization{}, nil } si, added, err := c.query_internal(sk) if err != nil { return Authorization{}, err } if si == nil { return Authorization{}, nil } if !added { var doc sessionMongo err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": sk, }, &doc) if err != nil { logger.Println("consumer Query :", err) return Authorization{}, err } if len(si.Key) > 0 { c.add_internal(sk, &doc) c.ids[doc.Id] = sk return *doc.Auth, nil } } return *si.Auth, nil } func (c *consumer_mongo) add(sk storagekey, id primitive.ObjectID, si *sessionMongo) { c.lock.Lock() defer c.lock.Unlock() c.consumer_common.add_internal(sk, si) c.ids[id] = sk } func (c *consumer_mongo) deleteById(id primitive.ObjectID) { c.lock.Lock() defer c.lock.Unlock() if sk, ok := c.ids[id]; ok { c.consumer_common.delete_internal(sk) delete(c.ids, id) } }