package client import ( "archive/zip" "bufio" "context" "encoding/json" "errors" "fmt" "io" "math" "net/http" "os" "os/exec" "path" "path/filepath" "sort" "strings" "syscall" "time" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) func lastExecutionArgs(verpath string) []string { argf, err := os.Open(path.Join(verpath, "@args")) if os.IsNotExist(err) { argf, err = os.Open(path.Clean(path.Join(verpath, "..", "@args"))) if os.IsNotExist(err) { return nil } } defer argf.Close() var out []string dec := json.NewDecoder(argf) dec.Decode(&out) return out } var errUploadZipLogFailed = errors.New("not ok") func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version string) error { zf, err := os.Open(zipFile) if err != nil { return err } if zf == nil { return errUploadZipLogFailed } defer zf.Close() req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", zf) if err != nil { logger.Println(err) } req.Header.Set("Houston-Service-Name", name) req.Header.Set("Houston-Service-Version", version) req.Header.Set("Houston-Service-Filename", path.Base(filepath.ToSlash(zipFile))) req.Header.Set("Content-Type", "application/zip") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errUploadZipLogFailed } return nil } func zipLogFiles(storageRoot string, req *shared.UploadRequest) (string, []string, error) { root := path.Join(storageRoot, req.Name, req.Version) matches, err := filepath.Glob(path.Join(root, req.Filter)) if err != nil { return "", nil, err } if len(matches) == 0 { return "", nil, nil } for i, file := range matches { file = filepath.ToSlash(file) matches[i] = file } root = path.Join(root, path.Dir(req.Filter)) zipFileName := path.Join(os.TempDir(), path.Base(filepath.ToSlash(matches[0]))) + ".zip" os.Remove(zipFileName) f, err := os.OpenFile(zipFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) if err != nil { return "", nil, err } defer f.Close() w := zip.NewWriter(f) defer w.Close() oldestFile := "" for i, file := range matches { if file == root { continue } if fi, err := os.Lstat(file); err == nil { if (fi.Mode() & os.ModeSymlink) == os.ModeSymlink { matches[i] = "" continue } } if len(oldestFile) == 0 { oldestFile = path.Base(filepath.ToSlash(file)) } relative := file[len(root)+1:] fw, err := w.Create(relative) if err != nil { return "", nil, err } src, err := os.Open(file) if err != nil { return "", nil, err } defer src.Close() if _, err = io.Copy(fw, src); err != nil { return "", nil, err } } return f.Name(), matches, nil } func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *procmeta { if len(req.Args) == 0 { return nil } verpath := path.Join(storageRoot, req.Name, req.Version) fi, err := os.Stat(verpath) if err == nil && fi.IsDir() { exefile := "./" + path.Clean(strings.TrimPrefix(req.Args[0], "/")) os.Chmod(path.Join(verpath, exefile), 0777) exef, _ := os.Executable() expanded := make([]string, len(req.Args)) for i, arg := range req.Args { expanded[i] = os.ExpandEnv(arg) } exename := path.Join(path.Dir(exef), verpath, exefile) logger.Println("exefile :", exefile) logger.Println("verpath :", verpath) logger.Println("exef :", exef) logger.Println("path.Dir :", path.Dir(exef)) logger.Println("exename :", exename) cmd := exec.Command(os.ExpandEnv(exename), expanded[1:]...) cmd.Dir = verpath stdin, _ := cmd.StdinPipe() seq++ return &procmeta{ id: seq, cmd: cmd, name: req.Name, args: req.Args, version: req.Version, verpath: verpath, state: int32(protos.ProcessState_Stopped), stdin: stdin, } } return nil } func makeLogFilePrefix(meta *procmeta, index int) string { now := time.Now().UTC() ext := path.Ext(meta.args[0]) nameonly := path.Base(filepath.ToSlash(meta.args[0])) if len(ext) > 0 { nameonly = nameonly[:len(nameonly)-len(ext)] } ts := now.Format("2006-01-02T15-04-05") if index == 0 { return path.Join(meta.verpath, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) } return path.Join(meta.verpath, "logs", fmt.Sprintf("%s_%d_%s", nameonly, index, ts)) } func (hc *houstonClient) launch(meta *procmeta) error { stdout, err := meta.cmd.StdoutPipe() if err != nil { return err } err = os.MkdirAll(path.Join(meta.verpath, "logs"), 0775) if err != nil { return err } stdReader := func(jobName string, r io.ReadCloser, index int) { defer func() { reco := recover() if reco != nil { logger.Println(reco) } }() defer func() { overflow := index / 64 offset := index % 64 key := fmt.Sprintf("%s-%d", meta.args[0], overflow) runningFlags := hc.siblingProcIndex[key] mask := uint64(1 << offset) runningFlags = runningFlags ^ mask hc.siblingProcIndex[key] = runningFlags }() defer r.Close() reader := bufio.NewReader(r) thisFileSize := 0 logFileIndex := 0 logFileNamePrefix := makeLogFilePrefix(meta, index) logFileName := fmt.Sprintf("%s_%d.log", logFileNamePrefix, logFileIndex) targetFile, err := os.Create(logFileName) if err != nil { logger.Println("failed to create log file :", logFileName) return } exef, _ := os.Executable() var linkPath string if index == 0 { linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), meta.name+".log") } else { linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), fmt.Sprintf("%s_%d.log", meta.name, index)) } os.Remove(linkPath) os.Symlink(path.Base(filepath.ToSlash(targetFile.Name())), linkPath) defer func() { if targetFile != nil { targetFile.Close() } }() readingMetric := false var metricBuffer []byte defer func() { logger.Println("stdReader is terminated :", meta.name) if meta.isState(protos.ProcessState_Running) { hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", meta.id), }, } } }() metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) defer metricExporter.Shutdown() for { buff, err := reader.ReadBytes('\n') if err != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) break } if readingMetric { metricBuffer = append(metricBuffer, buff...) } else if buff[0] == metric.METRIC_HEAD_INLINE { readingMetric = true metricBuffer = append(metricBuffer, buff[1:]...) } if readingMetric { if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { readingMetric = false metricBuffer = metricBuffer[:len(metricBuffer)-2] if metricBuffer[0] == '{' { var desc metric.MetricDescription if err := json.Unmarshal(metricBuffer, &desc); err != nil { logger.Println("unmarshal metric failed :", err, string(metricBuffer)) continue } if desc.ConstLabels == nil { desc.ConstLabels = make(map[string]string) } for k, v := range hc.config.ConstLabels { desc.ConstLabels[k] = v } desc.ConstLabels["job"] = jobName metricExporter.RegisterMetric(&desc) } else { key, val := metric.ReadMetricValue(metricBuffer) metricExporter.UpdateMetric(key, val) } metricBuffer = metricBuffer[:0] } continue } for written := 0; written < len(buff); { n, err := targetFile.Write(buff) if err != nil { logger.Println("write log file failed :", logFileName, err) break } else { written += n thisFileSize += n } } if thisFileSize > 5*1024*1024 { logFileIndex++ logFileName = fmt.Sprintf("%s_%d.log", logFileNamePrefix, logFileIndex) nextTargetFile, err := os.Create(logFileName) if err != nil { logger.Println("failed to create log file :", logFileName) } else { targetFile.Close() targetFile = nextTargetFile os.Remove(linkPath) os.Symlink(path.Base(filepath.ToSlash(targetFile.Name())), linkPath) thisFileSize = 0 } } } } index := 0 for overflow := 0; ; overflow++ { key := fmt.Sprintf("%s-%d", meta.args[0], overflow) runningFlags := hc.siblingProcIndex[key] if runningFlags == math.MaxUint64 { index += 64 } else { for si := 0; si < 64; si++ { mask := uint64(1 << si) if runningFlags&mask == 0 { index += si runningFlags |= mask break } } hc.siblingProcIndex[key] = runningFlags break } } go stdReader(meta.name, stdout, index) logger.Println("startChildProcess :", meta.cmd.Args) meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("HOUSTON_SIBLIING_INDEX=%d", index)) err = meta.cmd.Start() if err == nil { logger.Println("process index, pid =", index, meta.cmd.Process.Pid) set_affinity(meta.cmd.Process.Pid, index) meta.setState(protos.ProcessState_Running) } return err } var errPrepareprocessLaunchFailed = errors.New("prepareProcessLaunch failed") func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest, op protos.OperationClient) error { meta := prepareProcessLaunch(hc.config.StorageRoot, req) if meta == nil { return errPrepareprocessLaunchFailed } if err := hc.launch(meta); err != nil { return err } // launch가 성공하면 args 저장. this and parent folder vers := hc.deploys[req.Name] for _, ver := range vers { if ver.Version == req.Version { ver.Args = meta.args } } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, req.Version, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } hc.childProcs = append(hc.childProcs, meta) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) return nil } func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest, op protos.OperationClient) error { killer := func(proc *procmeta) { proc.setState(protos.ProcessState_Stopping) if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) } go func() { proc.cmd.Wait() hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", proc.id), }, } }() } for _, proc := range hc.childProcs { if !proc.isState(protos.ProcessState_Running) { continue } if req.Pid != 0 { if req.Pid == int32(proc.cmd.Process.Pid) { // 해당 pid만 제거 killer(proc) } } else if proc.name == req.Name { if len(req.Version) == 0 { // program 다 정지 killer(proc) } else if req.Version == proc.version { // program의 특정 버전만 정지 killer(proc) } } } op.Refresh(context.Background(), hc.makeOperationQueryRequest()) return nil } func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, op protos.OperationClient) error { for _, proc := range hc.childProcs { if proc.cmd.Process.Pid == int(req.Pid) { if len(req.Config) > 0 { // config.json를 먼저 다운로드 시도 root := proc.verpath if _, err := download(root, hc.makeDownloadUrl(req.Config), "", nil); err != nil { return err } } proc.setState(protos.ProcessState_Restart) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) hc.exitChan <- proc.cmd break } } return nil } func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { logger.Println("uploadFiles req :", *req) for _, child := range hc.childProcs { if child.version == req.Version && child.name == req.Name { logger.Println("uploadFiles found :", child.version, child.name) go func() { zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req) if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil { // 마지막거 빼고 삭제 sort.StringSlice(srcFiles).Sort() for i := 0; i < len(srcFiles)-1; i++ { if len(srcFiles[i]) > 0 { os.Remove(srcFiles[i]) } } } else { logger.Println("uploadZipLogFile failed :", err) } } else if err != nil { logger.Println("zipLogFiles failed :", err) } }() return nil } } // 실행 중이 아닌 폴더에서도 대상을 찾는다 // 전체 파일을 대상으로 zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req) if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { if err = hc.uploadZipLogFile(zipFile, req.Name, req.Version); err == nil { // 마지막거 빼고 삭제 sort.StringSlice(srcFiles).Sort() for i := 0; i < len(srcFiles)-1; i++ { if len(srcFiles[i]) > 0 { os.Remove(srcFiles[i]) } } } else { logger.Println("uploadZipLogFile failed :", err) } } else if err != nil { logger.Println("zipLogFiles failed :", err) } return nil }