From a844bed056cb808043946a3c168e953a61e8ad33 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 11 Nov 2024 20:50:51 +0900 Subject: [PATCH] =?UTF-8?q?houston=20=EC=97=85=EB=8D=B0=EC=9D=B4=ED=8A=B8?= =?UTF-8?q?=EB=A5=BC=20=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8=EB=A1=9C=20?= =?UTF-8?q?=ED=95=98=EC=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/client.go | 85 ++++++++++++++++-- client/client_test.go | 35 ++++++++ client/deploy.go | 27 ++++-- client/operation.go | 204 +++++++++--------------------------------- houston.sh | 51 ++++++++++- replacer/main.go | 113 ----------------------- 6 files changed, 222 insertions(+), 293 deletions(-) create mode 100644 client/client_test.go delete mode 100644 replacer/main.go diff --git a/client/client.go b/client/client.go index 373eaa7..e90dfc6 100644 --- a/client/client.go +++ b/client/client.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/fs" + "net/http" "os" "os/exec" "os/signal" @@ -22,6 +23,7 @@ import ( "time" "unsafe" + "github.com/djherbis/times" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" @@ -116,6 +118,12 @@ func (pm *procmeta) setState(s protos.ProcessState) { atomic.StoreInt32(&pm.state, int32(s)) } +type uploadRequest struct { + logFile string + name string + version string +} + type houstonClient struct { childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 @@ -125,6 +133,7 @@ type houstonClient struct { operationChan chan *protos.OperationQueryResponse exitChan chan *exec.Cmd clientChan chan *grpc.ClientConn + uploadChan chan uploadRequest timestamp string wg sync.WaitGroup config clientConfig @@ -317,6 +326,7 @@ func NewClient(standalone bool) (HoustonClient, error) { timestamp: exefi.ModTime().String(), version: string(ver), standalone: standalone, + uploadChan: make(chan uploadRequest, 100), siblingProcIndex: make(map[string]uint64), } @@ -494,15 +504,6 @@ func NewClient(standalone bool) (HoustonClient, error) { logger.Println(err) } - case shared.Upload: - var ur shared.UploadRequest - unmarshal(&ur, resp.Args) - logger.Println("args :", ur) - - if err := hc.uploadFiles(&ur); err != nil { - logger.Println(err) - } - case shared.Exception: idstr := resp.Args["id"] id64, _ := strconv.ParseInt(idstr, 10, 0) @@ -567,6 +568,59 @@ func NewClient(standalone bool) (HoustonClient, error) { return hc, nil } +func uploadSafe(url, filePath, name, version string) error { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + t, err := times.Stat(filePath) + if err != nil { + return err + } + + file, err := os.Open(filePath) + if err != nil { + return err + } + + if file == nil { + return errors.New("upload file is missing :" + filePath) + } + + defer file.Close() + + // hc.config.HttpAddress+"/upload", + httpreq, err := http.NewRequest("POST", url, file) + if err != nil { + return err + } + + hn, _ := os.Hostname() + // createTime := file. + httpreq.Header.Set("Houston-Service-Name", name) + httpreq.Header.Set("Houston-Service-Version", version) + httpreq.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(filePath)) + httpreq.Header.Set("Content-Type", "application/zip") + resp, err := http.DefaultClient.Do(httpreq) + if err != nil { + return err + } + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("upload file failed. response code : %s, %d", filePath, resp.StatusCode) + } + + if err := os.Remove(filePath); err != nil { + return err + } + + return nil +} + func (hc *houstonClient) Start() { // receive from stream defer func() { @@ -583,6 +637,19 @@ func (hc *houstonClient) Start() { proc.cmd.Wait() proc.cmd.Process.Release() } + + close(hc.uploadChan) + }() + + go func() { + // upload 고루틴 + url := hc.config.HttpAddress + "/upload" + for req := range hc.uploadChan { + err := uploadSafe(url, req.logFile, req.name, req.version) + if err != nil { + logger.Println("uploadSafe return err :", err) + } + } }() interrupt := make(chan os.Signal, 1) diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..e7f316b --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,35 @@ +package client + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func Test_houstonClient_Start(t *testing.T) { + tc := make(chan int, 1000) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + // receive + defer wg.Done() + + for v := range tc { + fmt.Println(v) + time.Sleep(100 * time.Millisecond) + } + }() + + go func() { + // send + for i := 0; i < 100; i++ { + tc <- i + } + close(tc) + fmt.Println("channel close called") + }() + + wg.Wait() +} diff --git a/client/deploy.go b/client/deploy.go index 94820f4..445a48d 100644 --- a/client/deploy.go +++ b/client/deploy.go @@ -323,18 +323,33 @@ func (hc *houstonClient) prepareUpdateSelf(req *shared.DeployRequest) (srcdir st return filepath.ToSlash(tempdir), replacer, err } -func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) error { +func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) (err error) { logger.Println("start deploying") - root, err := hc.prepareDeploy(req.Name, req.Version) - if err != nil { - return err + + defer func() { + if req.Name == "houston" && err == nil { + // houston.update 다운로드가 완료되었으므로 종료 + // 종료되고나면 스크립트가 알아서 재 실행 + hc.Shutdown() + } + }() + var root string + if req.Name == "houston" { + // houston은 버전없이 houston.update폴더로 다운로드 + root = "./houston.update" + } else { + root, err = hc.prepareDeploy(req.Name, req.Version) + if err != nil { + return err + } } // verpath에 배포 시작 h := md5.New() h.Write([]byte(strings.Trim(req.Url, "/"))) at := hex.EncodeToString(h.Sum(nil)) - fname, err := download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) { + var fname string + fname, err = download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) { prog := protos.DeployingProgress{ State: "download", Progress: written, @@ -360,7 +375,7 @@ func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.Deplo err = untar(fname) } - if err == nil && len(req.Config) > 0 { + if err == nil && len(req.Config) > 0 && req.Name != "houston" { // config.json도 다운로드 h := md5.New() h.Write([]byte(strings.Trim(req.Config, "/"))) diff --git a/client/operation.go b/client/operation.go index 8bbc448..bd56b43 100644 --- a/client/operation.go +++ b/client/operation.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math" - "net/http" "os" "os/exec" "path" @@ -21,8 +20,6 @@ import ( "syscall" "time" - "github.com/djherbis/times" - "github.com/Knetic/govaluate" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/metric" @@ -46,83 +43,12 @@ func lastExecutionArgs(verpath string) []string { return out } -var errUploadZipLogFailed = errors.New("not ok") - -func (hc *houstonClient) uploadLogFile(logFile string, name string, version string) error { - file, err := os.Open(logFile) - if err != nil { - return err +func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) { + hc.uploadChan <- uploadRequest{ + logFile: logFile, + name: name, + version: version, } - - if file == nil { - return errors.New("uploadLogFile failed : " + logFile) - } - - defer file.Close() - - req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", file) - if err != nil { - logger.Println(err) - } - - hn, _ := os.Hostname() - // createTime := file. - req.Header.Set("Houston-Service-Name", name) - req.Header.Set("Houston-Service-Version", version) - req.Header.Set("Houston-Service-Filename", path.Base(logFile)+"."+hn+path.Ext(logFile)) - req.Header.Set("Content-Type", "application/zip") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return errUploadZipLogFailed - } - - return nil -} - -func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) error { - t, err := times.Stat(logFile) - if err != nil { - return err - } - - file, err := os.Open(logFile) - if err != nil { - return err - } - - if file == nil { - return errors.New("uploadRuploadLogFileawLogFile failed : " + logFile) - } - - defer file.Close() - - req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", file) - if err != nil { - logger.Println(err) - } - - hn, _ := os.Hostname() - // createTime := file. - req.Header.Set("Houston-Service-Name", name) - req.Header.Set("Houston-Service-Version", version) - req.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(logFile)) - req.Header.Set("Content-Type", "application/zip") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return errUploadZipLogFailed - } - - return nil } func findMatchFiles(storageRoot, name, version, filter string) (string, []string) { @@ -377,8 +303,9 @@ func (hc *houstonClient) launch(meta *procmeta) error { }() total := 0 - var targetFile *os.File - var currentFilePath string + var logFile *os.File + var logFilePath string + var logFileTimestamp time.Time ext := path.Ext(logfilePath) head := logfilePath[:len(logfilePath)-len(ext)] @@ -391,12 +318,14 @@ func (hc *houstonClient) launch(meta *procmeta) error { var metricBuffer []byte wipeLogFile := func() { - total = 0 - if targetFile != nil { - targetFile.Close() - targetFile = nil - go hc.uploadToAppendLog(currentFilePath, meta.name, meta.version) + if logFile != nil { + logFile.Close() + logFile = nil + if total > 0 { + hc.uploadToAppendLog(logFilePath, meta.name, meta.version) + } } + total = 0 } defer func() { @@ -404,17 +333,15 @@ func (hc *houstonClient) launch(meta *procmeta) error { metricExporter.Shutdown() }() - currentTime := time.Now().UTC() for { - now := time.Now().UTC() - if now.YearDay() != currentTime.YearDay() { + if time.Since(logFileTimestamp) > time.Minute { wipeLogFile() } - if targetFile == nil { - currentTime = time.Now().UTC() - currentFilePath = head + currentTime.Format("2006-01-02.150405") + ext - targetFile, _ = os.OpenFile(currentFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) + if logFile == nil { + logFileTimestamp = time.Now() + logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext + logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } buff, err := reader.ReadBytes('\n') @@ -460,9 +387,9 @@ func (hc *houstonClient) launch(meta *procmeta) error { metricBuffer = metricBuffer[:0] } - } else if targetFile != nil && len(buff) > 0 { + } else if logFile != nil && len(buff) > 0 { for written := 0; written < len(buff); { - n, err := targetFile.Write(buff[written:]) + n, err := logFile.Write(buff[written:]) if err != nil { logger.Println("write log file failed :", logfilePath, err) break @@ -471,10 +398,6 @@ func (hc *houstonClient) launch(meta *procmeta) error { } } total += len(buff) - - if total > 1024*1024 { - wipeLogFile() - } } } } @@ -489,40 +412,38 @@ func (hc *houstonClient) launch(meta *procmeta) error { defer r.Close() total := 0 + var logFile *os.File + var logFilePath string + var logFileTimestamp time.Time - var targetFile *os.File - var currentFilePath string ext := path.Ext(logfilePath) head := logfilePath[:len(logfilePath)-len(ext)] - if len(head) > 0 { + if len(head) > 0 && !strings.HasSuffix(head, "/") { head += "." } reader := bufio.NewReader(r) wipeLogFile := func() { - total = 0 - if targetFile != nil { - targetFile.Close() - targetFile = nil - go hc.uploadToAppendLog(currentFilePath, meta.name, meta.version) + if logFile != nil { + logFile.Close() + logFile = nil + if total > 0 { + hc.uploadToAppendLog(logFilePath, meta.name, meta.version) + } } + total = 0 } + defer wipeLogFile() - defer func() { - wipeLogFile() - }() - - currentTime := time.Now().UTC() for { - now := time.Now().UTC() - if now.YearDay() != currentTime.YearDay() { + if time.Since(logFileTimestamp) > time.Minute { wipeLogFile() } - if targetFile == nil { - currentTime = time.Now().UTC() - currentFilePath = head + currentTime.Format("2006-01-02.150405") + ext - targetFile, _ = os.OpenFile(currentFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) + if logFile == nil { + logFileTimestamp = time.Now() + logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext + logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } buff, errRead := reader.ReadBytes('\n') @@ -531,9 +452,9 @@ func (hc *houstonClient) launch(meta *procmeta) error { break } - if targetFile != nil && len(buff) > 0 { + if logFile != nil && len(buff) > 0 { for written := 0; written < len(buff); { - n, err := targetFile.Write(buff[written:]) + n, err := logFile.Write(buff[written:]) if err != nil { logger.Println("write log file failed :", logfilePath, err) break @@ -542,10 +463,6 @@ func (hc *houstonClient) launch(meta *procmeta) error { } } total += len(buff) - - if total > 1024*1024 { - wipeLogFile() - } } } } @@ -717,42 +634,3 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, return nil } - -func (hc *houstonClient) uploadProcFiles(child *procmeta, filter string, deleteAfterUpload bool) { - logger.Println("uploadFiles found :", child.version, child.name) - _, matches := findMatchFiles(hc.config.StorageRoot, child.name, child.version, filter) - go func() { - for _, filename := range matches { - if err := hc.uploadLogFile(filename, child.name, child.version); err != nil { - break - } - if deleteAfterUpload { - os.Remove(filename) - } - } - }() -} - -func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { - logger.Println("uploadFiles req :", *req) - for _, child := range hc.childProcs { - if child.version == req.Version && child.name == req.Name { - hc.uploadProcFiles(child, req.Filter, false) - return nil - } - } - - // 실행 중이 아닌 폴더에서도 대상을 찾는다 - // 전체 파일을 대상으로 - _, matches := findMatchFiles(hc.config.StorageRoot, req.Name, req.Version, req.Filter) - go func() { - for _, filename := range matches { - if err := hc.uploadLogFile(filename, req.Name, req.Version); err != nil { - break - } - os.Remove(filename) - } - }() - - return nil -} diff --git a/houston.sh b/houston.sh index 09fc9ef..bf34ba4 100644 --- a/houston.sh +++ b/houston.sh @@ -1,4 +1,51 @@ -#!/bin/sh +#!/bin/bash -nohup /home/opdev/houston -client -logfile > /dev/null & +HOUSTON_PATH="./houston" +UPDATE_DIR="houston.update" +while true; do + # houston 실행 + if [ -f "$HOUSTON_PATH" ]; then + "$HOUSTON_PATH" + EXIT_CODE=$? + else + echo "houston 실행 파일이 없습니다." + EXIT_CODE=1 + fi + + # houston.update 폴더가 존재하는지 확인 + if [ -d "$UPDATE_DIR" ]; then + # houston 파일이 존재하는 경우에만 시간 비교 + if [ -f "$HOUSTON_PATH" ]; then + HOUSTON_TIME=$(stat -c %Y "$HOUSTON_PATH") + UPDATE_TIME=$(stat -c %Y "$UPDATE_DIR") + + if [ $UPDATE_TIME -gt $HOUSTON_TIME ]; then + echo "새로운 업데이트 폴더 발견. 업데이트를 진행합니다." + # houston.update 폴더 내의 모든 파일을 현재 폴더로 복사 + cp -R "$UPDATE_DIR"/* . + # 실행 권한 부여 (필요한 경우) + chmod +x "$HOUSTON_PATH" + # 업데이트 폴더 삭제 + rm -rf "$UPDATE_DIR" + echo "업데이트 완료 및 업데이트 폴더 삭제. houston을 다시 시작합니다." + else + echo "업데이트 폴더가 최신이 아닙니다. 업데이트를 건너뜁니다." + break + fi + else + echo "houston 파일이 없습니다. houston.update 폴더의 내용을 복사합니다." + cp -R "$UPDATE_DIR"/* . + chmod +x "$HOUSTON_PATH" + # 업데이트 폴더 삭제 + rm -rf "$UPDATE_DIR" + echo "houston을 시작합니다." + fi + else + echo "업데이트 폴더가 없습니다. 스크립트를 종료합니다." + break + fi +done + +echo "houston 실행 및 업데이트 스크립트 종료" +exit $EXIT_CODE \ No newline at end of file diff --git a/replacer/main.go b/replacer/main.go deleted file mode 100644 index f3cb41c..0000000 --- a/replacer/main.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "io" - "log" - "os" - "os/exec" - "path" - "time" -) - -func copy(src, dst string, stdlog *log.Logger) error { - fi, err := os.Stat(src) - if err != nil { - return err - } - if fi.IsDir() { - entries, _ := os.ReadDir(src) - for _, ent := range entries { - if err := copy(path.Join(src, ent.Name()), path.Join(dst, ent.Name()), stdlog); err != nil { - return err - } - } - return nil - } - - inmode := fi.Mode() - - in, err := os.Open(src) - if err != nil { - return err - } - defer in.Close() - - out, err := os.Create(dst) - if err != nil { - return err - } - defer out.Close() - - copied, err := io.Copy(out, in) - if err != nil { - return err - } - if copied < fi.Size() { - return errors.New("copy not completed") - } - if err := out.Sync(); err != nil { - return err - } - - if err := out.Chmod(inmode); err != nil { - return err - } - - stdlog.Println("file copied :", src, dst) - return nil -} - -func main() { - logfile, _ := os.OpenFile("replacer.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) - defer logfile.Close() - stdlog := log.New(logfile, "", log.LstdFlags) - - args := os.Args - // args[1] : 나를 시작한 pid. pid가 종료될 때 까지 기다림 - // args[2] : target 폴더 - // args[3:] : 다시 시작할 때 넘겨줄 arguments(프로세스 이름 포함) - stdlog.Println(args) - - for { - stdlog.Println("wait for terminating of", args[3]) - cmd := exec.Command("ps", "-p", args[1]) - if err := cmd.Run(); err != nil { - break - } - time.Sleep(time.Second) - } - - stdlog.Println("target is terminated") - - // replacer 제거. 내가 돌고 있으므로 복사는 안된다. - // 내가 실행되기 전에 이미 복사가 되서 나는 최신 버전임 - os.Remove(path.Join(args[2], os.Args[0])) - if err := copy(args[2], "", stdlog); err != nil { - stdlog.Fatal(err) - } - - nextArgs := args[4:] - if bt, _ := os.ReadFile("@args"); len(bt) > 0 { - var tempArgs []string - if json.Unmarshal(bt, &tempArgs) == nil { - nextArgs = tempArgs - } - } - os.Remove("@args") - - err := os.RemoveAll(args[2]) - if err != nil { - stdlog.Println("os.RemoveAll failed :", args[2], err) - } - - err = os.Chmod(args[3], 0775) - if err != nil { - stdlog.Println("os.Chmod failed :", err) - } - - stdlog.Println("exec.Command :", args) - cmd := exec.Command(args[3], nextArgs...) - cmd.Start() -}