houston 업데이트를 스크립트로 하자

This commit is contained in:
2024-11-11 20:50:51 +09:00
parent 2ddbae07b2
commit a844bed056
6 changed files with 222 additions and 293 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"os"
"os/exec"
"os/signal"
@ -22,6 +23,7 @@ import (
"time"
"unsafe"
"github.com/djherbis/times"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
@ -116,6 +118,12 @@ func (pm *procmeta) setState(s protos.ProcessState) {
atomic.StoreInt32(&pm.state, int32(s))
}
type uploadRequest struct {
logFile string
name string
version string
}
type houstonClient struct {
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
@ -125,6 +133,7 @@ type houstonClient struct {
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn
uploadChan chan uploadRequest
timestamp string
wg sync.WaitGroup
config clientConfig
@ -317,6 +326,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
timestamp: exefi.ModTime().String(),
version: string(ver),
standalone: standalone,
uploadChan: make(chan uploadRequest, 100),
siblingProcIndex: make(map[string]uint64),
}
@ -494,15 +504,6 @@ func NewClient(standalone bool) (HoustonClient, error) {
logger.Println(err)
}
case shared.Upload:
var ur shared.UploadRequest
unmarshal(&ur, resp.Args)
logger.Println("args :", ur)
if err := hc.uploadFiles(&ur); err != nil {
logger.Println(err)
}
case shared.Exception:
idstr := resp.Args["id"]
id64, _ := strconv.ParseInt(idstr, 10, 0)
@ -567,6 +568,59 @@ func NewClient(standalone bool) (HoustonClient, error) {
return hc, nil
}
func uploadSafe(url, filePath, name, version string) error {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
t, err := times.Stat(filePath)
if err != nil {
return err
}
file, err := os.Open(filePath)
if err != nil {
return err
}
if file == nil {
return errors.New("upload file is missing :" + filePath)
}
defer file.Close()
// hc.config.HttpAddress+"/upload",
httpreq, err := http.NewRequest("POST", url, file)
if err != nil {
return err
}
hn, _ := os.Hostname()
// createTime := file.
httpreq.Header.Set("Houston-Service-Name", name)
httpreq.Header.Set("Houston-Service-Version", version)
httpreq.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(filePath))
httpreq.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(httpreq)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("upload file failed. response code : %s, %d", filePath, resp.StatusCode)
}
if err := os.Remove(filePath); err != nil {
return err
}
return nil
}
func (hc *houstonClient) Start() {
// receive from stream
defer func() {
@ -583,6 +637,19 @@ func (hc *houstonClient) Start() {
proc.cmd.Wait()
proc.cmd.Process.Release()
}
close(hc.uploadChan)
}()
go func() {
// upload 고루틴
url := hc.config.HttpAddress + "/upload"
for req := range hc.uploadChan {
err := uploadSafe(url, req.logFile, req.name, req.version)
if err != nil {
logger.Println("uploadSafe return err :", err)
}
}
}()
interrupt := make(chan os.Signal, 1)

35
client/client_test.go Normal file
View File

@ -0,0 +1,35 @@
package client
import (
"fmt"
"sync"
"testing"
"time"
)
func Test_houstonClient_Start(t *testing.T) {
tc := make(chan int, 1000)
var wg sync.WaitGroup
wg.Add(1)
go func() {
// receive
defer wg.Done()
for v := range tc {
fmt.Println(v)
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
// send
for i := 0; i < 100; i++ {
tc <- i
}
close(tc)
fmt.Println("channel close called")
}()
wg.Wait()
}

View File

@ -323,18 +323,33 @@ func (hc *houstonClient) prepareUpdateSelf(req *shared.DeployRequest) (srcdir st
return filepath.ToSlash(tempdir), replacer, err
}
func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) error {
func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) (err error) {
logger.Println("start deploying")
root, err := hc.prepareDeploy(req.Name, req.Version)
if err != nil {
return err
defer func() {
if req.Name == "houston" && err == nil {
// houston.update 다운로드가 완료되었으므로 종료
// 종료되고나면 스크립트가 알아서 재 실행
hc.Shutdown()
}
}()
var root string
if req.Name == "houston" {
// houston은 버전없이 houston.update폴더로 다운로드
root = "./houston.update"
} else {
root, err = hc.prepareDeploy(req.Name, req.Version)
if err != nil {
return err
}
}
// verpath에 배포 시작
h := md5.New()
h.Write([]byte(strings.Trim(req.Url, "/")))
at := hex.EncodeToString(h.Sum(nil))
fname, err := download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) {
var fname string
fname, err = download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) {
prog := protos.DeployingProgress{
State: "download",
Progress: written,
@ -360,7 +375,7 @@ func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.Deplo
err = untar(fname)
}
if err == nil && len(req.Config) > 0 {
if err == nil && len(req.Config) > 0 && req.Name != "houston" {
// config.json도 다운로드
h := md5.New()
h.Write([]byte(strings.Trim(req.Config, "/")))

View File

@ -9,7 +9,6 @@ import (
"fmt"
"io"
"math"
"net/http"
"os"
"os/exec"
"path"
@ -21,8 +20,6 @@ import (
"syscall"
"time"
"github.com/djherbis/times"
"github.com/Knetic/govaluate"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/metric"
@ -46,83 +43,12 @@ func lastExecutionArgs(verpath string) []string {
return out
}
var errUploadZipLogFailed = errors.New("not ok")
func (hc *houstonClient) uploadLogFile(logFile string, name string, version string) error {
file, err := os.Open(logFile)
if err != nil {
return err
func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) {
hc.uploadChan <- uploadRequest{
logFile: logFile,
name: name,
version: version,
}
if file == nil {
return errors.New("uploadLogFile failed : " + logFile)
}
defer file.Close()
req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", file)
if err != nil {
logger.Println(err)
}
hn, _ := os.Hostname()
// createTime := file.
req.Header.Set("Houston-Service-Name", name)
req.Header.Set("Houston-Service-Version", version)
req.Header.Set("Houston-Service-Filename", path.Base(logFile)+"."+hn+path.Ext(logFile))
req.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errUploadZipLogFailed
}
return nil
}
func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) error {
t, err := times.Stat(logFile)
if err != nil {
return err
}
file, err := os.Open(logFile)
if err != nil {
return err
}
if file == nil {
return errors.New("uploadRuploadLogFileawLogFile failed : " + logFile)
}
defer file.Close()
req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", file)
if err != nil {
logger.Println(err)
}
hn, _ := os.Hostname()
// createTime := file.
req.Header.Set("Houston-Service-Name", name)
req.Header.Set("Houston-Service-Version", version)
req.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(logFile))
req.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errUploadZipLogFailed
}
return nil
}
func findMatchFiles(storageRoot, name, version, filter string) (string, []string) {
@ -377,8 +303,9 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}()
total := 0
var targetFile *os.File
var currentFilePath string
var logFile *os.File
var logFilePath string
var logFileTimestamp time.Time
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
@ -391,12 +318,14 @@ func (hc *houstonClient) launch(meta *procmeta) error {
var metricBuffer []byte
wipeLogFile := func() {
total = 0
if targetFile != nil {
targetFile.Close()
targetFile = nil
go hc.uploadToAppendLog(currentFilePath, meta.name, meta.version)
if logFile != nil {
logFile.Close()
logFile = nil
if total > 0 {
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
}
total = 0
}
defer func() {
@ -404,17 +333,15 @@ func (hc *houstonClient) launch(meta *procmeta) error {
metricExporter.Shutdown()
}()
currentTime := time.Now().UTC()
for {
now := time.Now().UTC()
if now.YearDay() != currentTime.YearDay() {
if time.Since(logFileTimestamp) > time.Minute {
wipeLogFile()
}
if targetFile == nil {
currentTime = time.Now().UTC()
currentFilePath = head + currentTime.Format("2006-01-02.150405") + ext
targetFile, _ = os.OpenFile(currentFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if logFile == nil {
logFileTimestamp = time.Now()
logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, err := reader.ReadBytes('\n')
@ -460,9 +387,9 @@ func (hc *houstonClient) launch(meta *procmeta) error {
metricBuffer = metricBuffer[:0]
}
} else if targetFile != nil && len(buff) > 0 {
} else if logFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := targetFile.Write(buff[written:])
n, err := logFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
@ -471,10 +398,6 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}
}
total += len(buff)
if total > 1024*1024 {
wipeLogFile()
}
}
}
}
@ -489,40 +412,38 @@ func (hc *houstonClient) launch(meta *procmeta) error {
defer r.Close()
total := 0
var logFile *os.File
var logFilePath string
var logFileTimestamp time.Time
var targetFile *os.File
var currentFilePath string
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
if len(head) > 0 {
if len(head) > 0 && !strings.HasSuffix(head, "/") {
head += "."
}
reader := bufio.NewReader(r)
wipeLogFile := func() {
total = 0
if targetFile != nil {
targetFile.Close()
targetFile = nil
go hc.uploadToAppendLog(currentFilePath, meta.name, meta.version)
if logFile != nil {
logFile.Close()
logFile = nil
if total > 0 {
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
}
total = 0
}
defer wipeLogFile()
defer func() {
wipeLogFile()
}()
currentTime := time.Now().UTC()
for {
now := time.Now().UTC()
if now.YearDay() != currentTime.YearDay() {
if time.Since(logFileTimestamp) > time.Minute {
wipeLogFile()
}
if targetFile == nil {
currentTime = time.Now().UTC()
currentFilePath = head + currentTime.Format("2006-01-02.150405") + ext
targetFile, _ = os.OpenFile(currentFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if logFile == nil {
logFileTimestamp = time.Now()
logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, errRead := reader.ReadBytes('\n')
@ -531,9 +452,9 @@ func (hc *houstonClient) launch(meta *procmeta) error {
break
}
if targetFile != nil && len(buff) > 0 {
if logFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := targetFile.Write(buff[written:])
n, err := logFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
@ -542,10 +463,6 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}
}
total += len(buff)
if total > 1024*1024 {
wipeLogFile()
}
}
}
}
@ -717,42 +634,3 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest,
return nil
}
func (hc *houstonClient) uploadProcFiles(child *procmeta, filter string, deleteAfterUpload bool) {
logger.Println("uploadFiles found :", child.version, child.name)
_, matches := findMatchFiles(hc.config.StorageRoot, child.name, child.version, filter)
go func() {
for _, filename := range matches {
if err := hc.uploadLogFile(filename, child.name, child.version); err != nil {
break
}
if deleteAfterUpload {
os.Remove(filename)
}
}
}()
}
func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error {
logger.Println("uploadFiles req :", *req)
for _, child := range hc.childProcs {
if child.version == req.Version && child.name == req.Name {
hc.uploadProcFiles(child, req.Filter, false)
return nil
}
}
// 실행 중이 아닌 폴더에서도 대상을 찾는다
// 전체 파일을 대상으로
_, matches := findMatchFiles(hc.config.StorageRoot, req.Name, req.Version, req.Filter)
go func() {
for _, filename := range matches {
if err := hc.uploadLogFile(filename, req.Name, req.Version); err != nil {
break
}
os.Remove(filename)
}
}()
return nil
}