From 6d63cce6d0f2eb14768b73eb7a4b324220801a66 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 13 Jun 2023 20:10:08 +0900 Subject: [PATCH] =?UTF-8?q?watch=20go=20=EB=A3=A8=ED=8B=B4=20gracefully=20?= =?UTF-8?q?exit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/watch.go | 60 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/core/watch.go b/core/watch.go index bf225a6..bd80953 100644 --- a/core/watch.go +++ b/core/watch.go @@ -98,7 +98,7 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { changed := stream.TryNext(ctx) if ctx.Err() != nil { - logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) + logger.Error("watchWhitelistCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } @@ -124,12 +124,20 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { } } } else { - logger.Error("watchServiceCollection stream.Decode failed :", err) + logger.Error("watchWhitelistCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchWhitelistCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchWhitelistCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -187,9 +195,17 @@ func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *htt if !changed { if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchServiceCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchServiceCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -346,9 +362,17 @@ func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux * logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchServiceCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchServiceCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -422,9 +446,17 @@ func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, m logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchAuthCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchAuthCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchAuthCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) }