package client import ( "archive/zip" "bufio" "context" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path" "path/filepath" "regexp" "slices" "strconv" "strings" "syscall" "time" "github.com/Knetic/govaluate" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) func lastExecutionArgs(verpath string) []string { argf, err := os.Open(path.Join(verpath, "@args")) if os.IsNotExist(err) { argf, err = os.Open(path.Clean(path.Join(verpath, "..", "@args"))) if os.IsNotExist(err) { return nil } } defer argf.Close() var out []string dec := json.NewDecoder(argf) dec.Decode(&out) return out } func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) { hc.uploadChan <- uploadRequest{ logFile: logFile, name: name, version: version, } } func findMatchFiles(storageRoot, name, version, filter string) (string, []string) { root := path.Join(storageRoot, name, version) matches, err := filepath.Glob(path.Join(root, filter)) if err != nil { return "", nil } if len(matches) == 0 { return "", nil } root = path.Join(root, path.Dir(filter)) out := make([]string, 0, len(matches)) for _, file := range matches { file = filepath.ToSlash(file) if file == root { continue } out = append(out, file) } slices.Sort(out) return root, out } func zipCompressFiles(root string, matches []string) (string, error) { f, err := os.CreateTemp(os.TempDir(), "*.zip") if err != nil { return "", err } defer f.Close() w := zip.NewWriter(f) defer w.Close() oldestFile := "" for i, file := range matches { if fi, err := os.Lstat(file); err == nil { if (fi.Mode() & os.ModeSymlink) == os.ModeSymlink { matches[i] = "" continue } } if len(oldestFile) == 0 { oldestFile = path.Base(filepath.ToSlash(file)) } relative := file[len(root)+1:] fw, err := w.Create(relative) if err != nil { return "", err } src, err := os.Open(file) if err != nil { return "", err } defer src.Close() if _, err = io.Copy(fw, src); err != nil { return "", err } } return f.Name(), nil } func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) (*procmeta, error) { if len(req.Args) == 0 { return nil, errors.New("args is empty") } foundVersion := req.Version if req.Version == "latest" { entries, err := os.ReadDir(path.Join(storageRoot, req.Name)) if err != nil { return nil, err } var latestTimestamp time.Time var latestVersion string for _, entry := range entries { if !entry.IsDir() { continue } fi, err := entry.Info() if err != nil { return nil, err } createTime := fi.ModTime() if latestTimestamp.Before(createTime) { latestTimestamp = fi.ModTime() latestVersion = fi.Name() } } if len(latestVersion) > 0 { foundVersion = latestVersion } } verpath := path.Join(storageRoot, req.Name, foundVersion) fi, err := os.Stat(verpath) if err != nil { return nil, err } if fi.IsDir() { exefile := "./" + path.Clean(strings.TrimPrefix(req.Args[0], "/")) os.Chmod(path.Join(verpath, exefile), 0777) exef, _ := os.Executable() expanded := make([]string, len(req.Args)) for i, arg := range req.Args { expanded[i] = os.ExpandEnv(arg) } exename := path.Join(path.Dir(strings.ReplaceAll(exef, "\\", "/")), verpath, exefile) logger.Println("exefile :", exefile) logger.Println("verpath :", verpath) logger.Println("exef :", exef) logger.Println("path.Dir :", path.Dir(exef)) logger.Println("exename :", exename) cmd := exec.Command(os.ExpandEnv(exename), expanded[1:]...) cmd.Dir = verpath stdin, _ := cmd.StdinPipe() seq++ return &procmeta{ id: seq, cmd: cmd, name: req.Name, args: req.Args, version: foundVersion, recover: req.AutoRestart, verpath: verpath, state: int32(protos.ProcessState_Stopped), stdin: stdin, logfile: req.OutputLogFile, keepLatest: req.Version == "latest", }, nil } return nil, errors.New("not found") } func evaluateArgs(args []string, params map[string]any) ([]string, error) { re := regexp.MustCompile(`\$\(\((.*?)\)\)`) for i, input := range args { matches := re.FindAllStringSubmatch(input, -1) if len(matches) == 0 { continue } for _, match := range matches { if len(match) > 1 { expression := strings.TrimSpace(match[1]) expr, err := govaluate.NewEvaluableExpression(expression) if err != nil { return nil, err } result, err := expr.Evaluate(params) if err != nil { return nil, err } // 원래 표현식을 결과로 대체 input = strings.Replace(input, match[0], fmt.Sprintf("%v", result), -1) } } args[i] = input } return args, nil } func parseEnv(input []string) map[string]any { output := make(map[string]any, len(input)) for _, envkv := range input { kv := strings.SplitN(envkv, "=", 2) parsed, err := strconv.ParseInt(kv[1], 10, 0) if err == nil { output[kv[0]] = parsed } else { parsed, err := strconv.ParseFloat(kv[1], 32) if err == nil { output[kv[0]] = parsed } else { output[kv[0]] = kv[1] } } } return output } func (hc *houstonClient) launch(meta *procmeta) error { stdout, err := meta.cmd.StdoutPipe() if err != nil { return err } stderr, err := meta.cmd.StderrPipe() if err != nil { return err } logfolder := path.Join(meta.verpath, "logs") err = os.MkdirAll(logfolder, 0775) if err != nil { return err } logUploader := func(localctx context.Context, logfilePath string, logChan chan []byte) { var logFile *os.File var logFilePath string ext := path.Ext(logfilePath) head := logfilePath[:len(logfilePath)-len(ext)] if len(head) > 0 && !strings.HasSuffix(head, "/") { head += "." } writeLog := func(log []byte) { if logFile == nil { logFilePath = head + time.Now().UTC().Format("2006-01-02.150405") + ext logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } for written := 0; written < len(log); { n, err := logFile.Write(log[written:]) if err != nil { logger.Println("write log file failed :", logfilePath, err) break } else { written += n } } } defer func() { if logFile != nil { logFile.Close() logFile = nil hc.uploadToAppendLog(logFilePath, meta.name, meta.version) } }() defer func() { for { select { case log := <-logChan: writeLog(log) default: // logChan에 있는 모든 로그 소비 return } } }() for { heartbeat := time.After(time.Minute) select { case <-localctx.Done(): return case <-heartbeat: heartbeat = time.After(time.Minute) // 지금까지의 로그를 저장해서 업로드 if logFile != nil { logFile.Close() logFile = nil hc.uploadToAppendLog(logFilePath, meta.name, meta.version) } case log := <-logChan: writeLog(log) } } } stdReader := func(r io.ReadCloser, logfilePath string, verify func(buff []byte) bool) { defer func() { reco := recover() if reco != nil { logger.Println(reco) } r.Close() }() localctx, cancel := context.WithCancel(context.Background()) defer cancel() logChan := make(chan []byte, 1) go logUploader(localctx, logfilePath, logChan) reader := bufio.NewReader(r) for { buff, err := reader.ReadBytes('\n') if err != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) break } if verify(buff) { if len(buff) > 0 { logChan <- buff } } } } var evalfile string if len(meta.logfile) > 0 { evalfile = path.Join(logfolder, meta.logfile) } else { evalfile = logfolder + "/" } go func() { metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) defer metricExporter.Shutdown() var metricBuffer []byte readingMetric := false stdReader(stdout, evalfile+".log", func(buff []byte) bool { if readingMetric { metricBuffer = append(metricBuffer, buff...) } else if buff[0] == metric.METRIC_HEAD_INLINE { readingMetric = true metricBuffer = append(metricBuffer, buff[1:]...) } if readingMetric { if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { readingMetric = false metricBuffer = metricBuffer[:len(metricBuffer)-2] if metricBuffer[0] == '{' { var desc metric.MetricDescription if err := json.Unmarshal(metricBuffer, &desc); err != nil { logger.Println("unmarshal metric failed :", err, string(metricBuffer)) return false } if desc.ConstLabels == nil { desc.ConstLabels = make(map[string]string) } for k, v := range hc.config.ConstLabels { desc.ConstLabels[k] = v } desc.ConstLabels["job"] = meta.name metricExporter.RegisterMetric(&desc) } else { key, val := metric.ReadMetricValue(metricBuffer) metricExporter.UpdateMetric(key, val) } metricBuffer = metricBuffer[:0] } return false } return true }) logger.Println("stdReader is terminated :", meta.name) if meta.isState(protos.ProcessState_Running) { // state는 running인데 종료됐으면 exception처리 hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", meta.id), }, } } }() go stdReader(stderr, evalfile+".err", func([]byte) bool { return true }) logger.Println("startChildProcess :", meta.cmd.Args) err = meta.cmd.Start() if err == nil { meta.setState(protos.ProcessState_Running) } return err } func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) error { meta, err := prepareProcessLaunch(hc.config.StorageRoot, req) if err != nil { return err } if err := hc.launch(meta); err != nil { return err } // launch가 성공하면 args 저장. this and parent folder vers := hc.deploys[req.Name] for _, ver := range vers { if ver.Version == req.Version { ver.Args = meta.args } } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, req.Version, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } hc.childProcs = append(hc.childProcs, meta) return nil } func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest, op protos.OperationClient) error { killer := func(proc *procmeta) { proc.setState(protos.ProcessState_Stopping) if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) } go func() { proc.cmd.Wait() hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", proc.id), }, } }() } for _, proc := range hc.childProcs { if !proc.isState(protos.ProcessState_Running) { continue } if req.Pid != 0 { if req.Pid == int32(proc.cmd.Process.Pid) { // 해당 pid만 제거 killer(proc) } } else if proc.name == req.Name { if len(req.Version) == 0 { // program 다 정지 killer(proc) } else if req.Version == proc.version { // program의 특정 버전만 정지 killer(proc) } } } op.Refresh(context.Background(), hc.makeOperationQueryRequest()) return nil } func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, op protos.OperationClient) error { for _, proc := range hc.childProcs { if proc.cmd.Process.Pid == int(req.Pid) { if len(req.Config) > 0 { // config.json를 먼저 다운로드 시도 root := proc.verpath if _, err := download(root, hc.makeDownloadUrl(req.Config), "", nil); err != nil { return err } } proc.setState(protos.ProcessState_Restart) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) hc.exitChan <- proc.cmd break } } return nil }