watch go 루틴 gracefully exit

This commit is contained in:
2023-06-13 20:10:08 +09:00
parent 13021c47ce
commit 6d63cce6d0

View File

@ -98,7 +98,7 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
logger.Error("watchWhitelistCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
@ -124,12 +124,20 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
}
}
} else {
logger.Error("watchServiceCollection stream.Decode failed :", err)
logger.Error("watchWhitelistCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
select {
case <-ctx.Done():
logger.Println("watchWhitelistCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchWhitelistCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
@ -187,9 +195,17 @@ func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *htt
if !changed {
if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
select {
case <-ctx.Done():
logger.Println("watchServiceCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
@ -346,9 +362,17 @@ func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *
logger.Error("watchServiceCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
select {
case <-ctx.Done():
logger.Println("watchServiceCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchServiceCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
@ -422,9 +446,17 @@ func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, m
logger.Error("watchAuthCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
select {
case <-ctx.Done():
logger.Println("watchAuthCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}