diff --git a/session/common.go b/session/common.go new file mode 100644 index 0000000..bd8f959 --- /dev/null +++ b/session/common.go @@ -0,0 +1,5 @@ +package session + +const ( + communication_channel_name_prefix = "_sess_comm_chan_name" +) diff --git a/session/consumer.go b/session/consumer.go new file mode 100644 index 0000000..fc7bed9 --- /dev/null +++ b/session/consumer.go @@ -0,0 +1,223 @@ +package session + +import ( + "context" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type innerSession[T any] struct { + inner *T + expireAt time.Time +} + +type cache_stage[T any] struct { + cache map[string]innerSession[T] + deleted map[string]bool +} + +func make_cache_stage[T any]() *cache_stage[T] { + return &cache_stage[T]{ + cache: make(map[string]innerSession[T]), + deleted: make(map[string]bool), + } +} + +type Consumer[T any] struct { + lock sync.Mutex + redisClient *redis.Client + ttl time.Duration + ctx context.Context + stages [2]*cache_stage[T] + startTime time.Time +} + +func NewConsumer[T any](ctx context.Context, redisUrl string, ttl time.Duration) (*Consumer[T], error) { + redisClient, err := gocommon.NewRedisClient(redisUrl) + if err != nil { + return nil, err + } + + consumer := &Consumer[T]{ + redisClient: redisClient, + ttl: ttl, + ctx: ctx, + stages: [2]*cache_stage[T]{make_cache_stage[T](), make_cache_stage[T]()}, + startTime: time.Now(), + } + + updateChannel := communication_channel_name_prefix + "_u" + deleteChannel := communication_channel_name_prefix + "_d" + sub := redisClient.Subscribe(ctx, updateChannel, deleteChannel) + + go func() { + stageswitch := time.Now().Add(ttl) + tickTimer := time.After(ttl) + + for { + select { + case <-ctx.Done(): + return + + case <-tickTimer: + consumer.changeStage() + stageswitch = stageswitch.Add(ttl) + tempttl := time.Until(stageswitch) + tickTimer = time.After(tempttl) + + case msg := <-sub.Channel(): + if msg == nil { + return + } + + if len(msg.Payload) == 0 { + continue + } + + switch msg.Channel { + case updateChannel: + key := msg.Payload + raw, err := redisClient.Get(ctx, key).Result() + if err != nil { + logger.Println(err) + } else if len(raw) > 0 { + var si T + if bson.Unmarshal([]byte(raw), &si) == nil { + consumer.add(key, &si) + } + } + case deleteChannel: + key := msg.Payload + consumer.delete(key) + } + } + } + }() + + return consumer, nil +} + +func (c *Consumer[T]) add_internal(key string, si *T, ttl time.Duration) { + inner := innerSession[T]{ + inner: si, + expireAt: time.Now().Add(ttl), + } + + c.stages[0].cache[key] = inner + delete(c.stages[0].deleted, key) + c.stages[1].cache[key] = inner + delete(c.stages[1].deleted, key) + + logger.Printf("add : %v, %v\n", *c.stages[0], *c.stages[1]) +} + +func (c *Consumer[T]) add(key string, si *T) { + c.lock.Lock() + defer c.lock.Unlock() + + c.add_internal(key, si, c.ttl) +} + +func (c *Consumer[T]) delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + + delete(c.stages[0].cache, key) + c.stages[0].deleted[key] = true + delete(c.stages[1].cache, key) + c.stages[1].deleted[key] = true + + logger.Printf("delete : %v, %v\n", *c.stages[0], *c.stages[1]) +} + +func (c *Consumer[T]) changeStage() { + c.lock.Lock() + defer c.lock.Unlock() + + logger.Printf("changeStage : %v, %v\n", *c.stages[0], *c.stages[1]) + + c.stages[1] = c.stages[0] + c.stages[0] = make_cache_stage[T]() + + logger.Printf("---> : %v, %v\n", *c.stages[0], *c.stages[1]) +} + +func (c *Consumer[T]) Query(key string) *T { + c.lock.Lock() + defer c.lock.Unlock() + + if _, deleted := c.stages[0].deleted[key]; deleted { + return nil + } + + if _, deleted := c.stages[1].deleted[key]; deleted { + return nil + } + + found, ok := c.stages[0].cache[key] + if !ok { + found, ok = c.stages[1].cache[key] + } + + if ok { + if found.expireAt.After(time.Now()) { + return found.inner + } + } + + payload, err := c.redisClient.Get(c.ctx, key).Result() + if err == redis.Nil { + return nil + } else if err != nil { + logger.Println("consumer Query :", err) + return nil + } + + if len(payload) > 0 { + var si T + if bson.Unmarshal([]byte(payload), &si) == nil { + ttl, err := c.redisClient.TTL(c.ctx, key).Result() + if err != nil { + logger.Println("consumer Query :", err) + return nil + } + + c.add_internal(key, &si, ttl) + return &si + } + } + return nil +} + +func (c *Consumer[T]) Touch(key string) bool { + c.lock.Lock() + defer c.lock.Unlock() + + ok, err := c.redisClient.Expire(c.ctx, key, c.ttl).Result() + if err == redis.Nil { + return false + } else if err != nil { + logger.Println("consumer Touch :", err) + return false + } + + if ok { + newexpire := time.Now().Add(c.ttl) + found, ok := c.stages[0].cache[key] + if ok { + found.expireAt = newexpire + } + + found, ok = c.stages[1].cache[key] + if ok { + found.expireAt = newexpire + } + } + + return ok +} diff --git a/session/provider.go b/session/provider.go new file mode 100644 index 0000000..84f0899 --- /dev/null +++ b/session/provider.go @@ -0,0 +1,61 @@ +package session + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson" + "repositories.action2quare.com/ayo/gocommon" +) + +type Provider[T any] struct { + redisClient *redis.Client + updateChannel string + deleteChannel string + ttl time.Duration + ctx context.Context +} + +func NewProvider[T any](ctx context.Context, redisUrl string, ttl time.Duration) (*Provider[T], error) { + redisClient, err := gocommon.NewRedisClient(redisUrl) + if err != nil { + return nil, err + } + + return &Provider[T]{ + redisClient: redisClient, + updateChannel: communication_channel_name_prefix + "_u", + deleteChannel: communication_channel_name_prefix + "_d", + ttl: ttl, + ctx: ctx, + }, nil +} + +func (p *Provider[T]) Update(key string, input *T) error { + bt, err := bson.Marshal(input) + if err != nil { + return err + } + + _, err = p.redisClient.SetEX(p.ctx, key, bt, p.ttl).Result() + if err != nil { + return err + } + + _, err = p.redisClient.Publish(p.ctx, p.updateChannel, key).Result() + return err +} + +func (p *Provider[T]) Delete(key string) error { + cnt, err := p.redisClient.Del(p.ctx, key).Result() + if err != nil { + return err + } + + if cnt > 0 { + _, err = p.redisClient.Publish(p.ctx, p.deleteChannel, key).Result() + } + + return err +} diff --git a/session/session_test.go b/session/session_test.go new file mode 100644 index 0000000..0f2c3fd --- /dev/null +++ b/session/session_test.go @@ -0,0 +1,71 @@ +// package main ... +package session + +import ( + "context" + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type sessioninfo struct { + Platform string + Uid string +} + +func TestExpTable(t *testing.T) { + pv, err := NewProvider[sessioninfo](context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + if err != nil { + t.Error(err) + } + + cs, err := NewConsumer[sessioninfo](context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + if err != nil { + t.Error(err) + } + + newid1 := primitive.NewObjectID() + newid2 := primitive.NewObjectID() + go func() { + for { + logger.Println("query :", cs.Query(newid1.Hex())) + logger.Println("query :", cs.Query(newid2.Hex())) + time.Sleep(time.Second) + } + }() + + time.Sleep(2 * time.Second) + pv.Update(newid1.Hex(), &sessioninfo{ + Platform: "editor", + Uid: "uid-1", + }) + + time.Sleep(2 * time.Second) + pv.Update(newid2.Hex(), &sessioninfo{ + Platform: "editor", + Uid: "uid-2", + }) + + cs.Touch(newid1.Hex()) + time.Sleep(2 * time.Second) + cs.Touch(newid2.Hex()) + time.Sleep(2 * time.Second) + + time.Sleep(2 * time.Second) + pv.Delete(newid1.Hex()) + + cs.Touch(newid1.Hex()) + time.Sleep(2 * time.Second) + cs.Touch(newid2.Hex()) + time.Sleep(2 * time.Second) + + cs2, err := NewConsumer[sessioninfo](context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + if err != nil { + t.Error(err) + } + + logger.Println("queryf :", cs2.Query(newid2.Hex())) + time.Sleep(20 * time.Second) +}