세션 consumer에 Revoke 추가

This commit is contained in:
2024-04-23 15:57:25 +09:00
parent fba613f83c
commit 0a65c6009e
2 changed files with 27 additions and 18 deletions

View File

@ -50,8 +50,8 @@ func (p *provider_redis) New(input *Authorization) (string, error) {
return "", err return "", err
} }
p.redisClient.Del(p.ctx, sks...)
for _, sk := range sks { for _, sk := range sks {
p.redisClient.HSet(p.ctx, sk, "inv", "true")
p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result() p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result()
} }
@ -323,7 +323,17 @@ func (c *consumer_redis) Revoke(pk string) {
sk := publickey_to_storagekey(publickey(pk)) sk := publickey_to_storagekey(publickey(pk))
c.redisClient.Del(c.ctx, string(sk)) c.redisClient.Del(c.ctx, string(sk))
c.redisClient.Publish(c.ctx, c.deleteChannel, string(sk)).Result()
c.lock.Lock()
defer c.lock.Unlock()
if sr, ok := c.stages[0].cache[sk]; ok {
c.stages[0].deleted[sk] = sr
}
if sr, ok := c.stages[1].cache[sk]; ok {
c.stages[1].deleted[sk] = sr
}
} }
func (c *consumer_redis) IsRevoked(accid primitive.ObjectID) bool { func (c *consumer_redis) IsRevoked(accid primitive.ObjectID) bool {

View File

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"math/rand"
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
@ -28,8 +27,8 @@ type peerCtorChannelValue struct {
} }
type peerDtorChannelValue struct { type peerDtorChannelValue struct {
accid primitive.ObjectID accid primitive.ObjectID
closed bool sk string
} }
type websocketPeerHandler[T PeerInterface] struct { type websocketPeerHandler[T PeerInterface] struct {
@ -178,8 +177,7 @@ func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, pre
func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID) { func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID) {
ws.peerDtorChannel <- peerDtorChannelValue{ ws.peerDtorChannel <- peerDtorChannelValue{
accid: accid, accid: accid,
closed: false,
} }
} }
@ -192,18 +190,21 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() {
case estVal := <-ws.peerCtorChannel: case estVal := <-ws.peerCtorChannel:
all[estVal.accid] = estVal.conn all[estVal.accid] = estVal.conn
case disVal := <-ws.peerDtorChannel: case disVal := <-ws.peerDtorChannel:
if disVal.closed { if c := all[disVal.accid]; c != nil {
delete(all, disVal.accid)
} else if c := all[disVal.accid]; c != nil {
c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{}) c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
delete(all, disVal.accid) delete(all, disVal.accid)
} }
if len(disVal.sk) > 0 {
ws.sessionConsumer.Revoke(disVal.sk)
delete(all, disVal.accid)
}
} }
} }
} }
func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, sk string) {
go func(c *websocket.Conn, accid primitive.ObjectID) { go func(c *websocket.Conn, accid primitive.ObjectID, sk string) {
peer := ws.createPeer(accid) peer := ws.createPeer(accid)
var closeReason string var closeReason string
@ -211,7 +212,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
ws.peerCtorChannel <- peerCtorChannelValue{accid: accid, conn: conn} ws.peerCtorChannel <- peerCtorChannelValue{accid: accid, conn: conn}
defer func() { defer func() {
ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, closed: true} ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, sk: sk}
peer.ClientDisconnected(closeReason) peer.ClientDisconnected(closeReason)
}() }()
@ -285,7 +286,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
} }
} }
} }
}(conn, accid) }(conn, accid, sk)
} }
func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http.Request) {
@ -345,8 +346,7 @@ func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http
// alias = accid.Hex() // alias = accid.Hex()
// } // }
nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() ws.upgrade_core(conn, accid, sk)
ws.upgrade_core(conn, accid, nonce)
} }
func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
@ -387,6 +387,5 @@ func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Reques
// } else { // } else {
// alias = authinfo.Account.Hex() // alias = authinfo.Account.Hex()
// } // }
nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() ws.upgrade_core(conn, authinfo.Account, sk)
ws.upgrade_core(conn, authinfo.Account, nonce)
} }