From df56a542a1b873514511e33b66d24916fafe44b5 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 14 Nov 2024 10:48:30 +0900 Subject: [PATCH] =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20=EC=97=85=EB=A1=9C?= =?UTF-8?q?=EB=93=9C=20=EB=A1=9C=EC=A7=81=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/operation.go | 312 ++++++++++++-------------------------------- 1 file changed, 87 insertions(+), 225 deletions(-) diff --git a/client/operation.go b/client/operation.go index bd56b43..df9fcaf 100644 --- a/client/operation.go +++ b/client/operation.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "math" "os" "os/exec" "path" @@ -22,7 +21,6 @@ import ( "github.com/Knetic/govaluate" "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) @@ -269,31 +267,75 @@ func (hc *houstonClient) launch(meta *procmeta) error { return err } - stdReader := func(jobName string, r io.ReadCloser, index int, logfilePath string) { - defer func() { - logger.Println("stdReader is terminated :", meta.name) - if meta.isState(protos.ProcessState_Running) { - hc.operationChan <- &protos.OperationQueryResponse{ - Operation: string(shared.Exception), - Args: map[string]string{ - "id": fmt.Sprintf("%d", meta.id), - }, + logUploader := func(localctx context.Context, logfilePath string, logChan chan []byte) { + var logFile *os.File + var logFilePath string + + ext := path.Ext(logfilePath) + head := logfilePath[:len(logfilePath)-len(ext)] + if len(head) > 0 && !strings.HasSuffix(head, "/") { + head += "." + } + + writeLog := func(log []byte) { + if logFile == nil { + logFilePath = head + time.Now().UTC().Format("2006-01-02.150405") + ext + logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) + } + for written := 0; written < len(log); { + n, err := logFile.Write(log[written:]) + if err != nil { + logger.Println("write log file failed :", logfilePath, err) + break + } else { + written += n } } + } + + defer func() { + if logFile != nil { + logFile.Close() + logFile = nil + hc.uploadToAppendLog(logFilePath, meta.name, meta.version) + } }() defer func() { - overflow := index / 64 - offset := index % 64 - key := fmt.Sprintf("%s-%d", meta.args[0], overflow) - runningFlags := hc.siblingProcIndex[key] - mask := uint64(1 << offset) - runningFlags = runningFlags ^ mask - hc.siblingProcIndex[key] = runningFlags + for { + select { + case log := <-logChan: + writeLog(log) + + default: + // logChan에 있는 모든 로그 소비 + return + } + } }() - metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) + for { + heartbeat := time.After(time.Minute) + select { + case <-localctx.Done(): + return + case <-heartbeat: + heartbeat = time.After(time.Minute) + // 지금까지의 로그를 저장해서 업로드 + if logFile != nil { + logFile.Close() + logFile = nil + hc.uploadToAppendLog(logFilePath, meta.name, meta.version) + } + + case log := <-logChan: + writeLog(log) + } + } + } + + stdReader := func(r io.ReadCloser, logfilePath string) { defer func() { reco := recover() if reco != nil { @@ -302,234 +344,54 @@ func (hc *houstonClient) launch(meta *procmeta) error { r.Close() }() - total := 0 - var logFile *os.File - var logFilePath string - var logFileTimestamp time.Time + localctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logChan := make(chan []byte, 1) + go logUploader(localctx, logfilePath, logChan) - ext := path.Ext(logfilePath) - head := logfilePath[:len(logfilePath)-len(ext)] - if len(head) > 0 && !strings.HasSuffix(head, "/") { - head += "." - } reader := bufio.NewReader(r) - readingMetric := false - - var metricBuffer []byte - - wipeLogFile := func() { - if logFile != nil { - logFile.Close() - logFile = nil - if total > 0 { - hc.uploadToAppendLog(logFilePath, meta.name, meta.version) - } - } - total = 0 - } - - defer func() { - wipeLogFile() - metricExporter.Shutdown() - }() - for { - if time.Since(logFileTimestamp) > time.Minute { - wipeLogFile() - } - - if logFile == nil { - logFileTimestamp = time.Now() - logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext - logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - } - buff, err := reader.ReadBytes('\n') if err != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) break } - if readingMetric { - metricBuffer = append(metricBuffer, buff...) - } else if buff[0] == metric.METRIC_HEAD_INLINE { - readingMetric = true - metricBuffer = append(metricBuffer, buff[1:]...) - } - - if readingMetric { - if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { - readingMetric = false - - metricBuffer = metricBuffer[:len(metricBuffer)-2] - if metricBuffer[0] == '{' { - var desc metric.MetricDescription - if err := json.Unmarshal(metricBuffer, &desc); err != nil { - logger.Println("unmarshal metric failed :", err, string(metricBuffer)) - continue - } - - if desc.ConstLabels == nil { - desc.ConstLabels = make(map[string]string) - } - - for k, v := range hc.config.ConstLabels { - desc.ConstLabels[k] = v - } - - desc.ConstLabels["job"] = jobName - - metricExporter.RegisterMetric(&desc) - } else { - key, val := metric.ReadMetricValue(metricBuffer) - metricExporter.UpdateMetric(key, val) - } - - metricBuffer = metricBuffer[:0] - } - } else if logFile != nil && len(buff) > 0 { - for written := 0; written < len(buff); { - n, err := logFile.Write(buff[written:]) - if err != nil { - logger.Println("write log file failed :", logfilePath, err) - break - } else { - written += n - } - } - total += len(buff) + if len(buff) > 0 { + logChan <- buff } } } - errReader := func(r io.ReadCloser, logfilePath string) { - defer func() { - reco := recover() - if reco != nil { - logger.Println(reco) - } - }() - defer r.Close() - - total := 0 - var logFile *os.File - var logFilePath string - var logFileTimestamp time.Time - - ext := path.Ext(logfilePath) - head := logfilePath[:len(logfilePath)-len(ext)] - if len(head) > 0 && !strings.HasSuffix(head, "/") { - head += "." - } - reader := bufio.NewReader(r) - - wipeLogFile := func() { - if logFile != nil { - logFile.Close() - logFile = nil - if total > 0 { - hc.uploadToAppendLog(logFilePath, meta.name, meta.version) - } - } - total = 0 - } - defer wipeLogFile() - - for { - if time.Since(logFileTimestamp) > time.Minute { - wipeLogFile() - } - - if logFile == nil { - logFileTimestamp = time.Now() - logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext - logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - } - - buff, errRead := reader.ReadBytes('\n') - if errRead != nil { - logger.Println("ReadBytes at stdReader return err :", err, meta.name) - break - } - - if logFile != nil && len(buff) > 0 { - for written := 0; written < len(buff); { - n, err := logFile.Write(buff[written:]) - if err != nil { - logger.Println("write log file failed :", logfilePath, err) - break - } else { - written += n - } - } - total += len(buff) - } - } - } - - index := 0 - for overflow := 0; ; overflow++ { - key := fmt.Sprintf("%s-%d", meta.args[0], overflow) - runningFlags := hc.siblingProcIndex[key] - if runningFlags == math.MaxUint64 { - index += 64 - } else { - for si := 0; si < 64; si++ { - mask := uint64(1 << si) - if runningFlags&mask == 0 { - index += si - runningFlags |= mask - break - } - } - hc.siblingProcIndex[key] = runningFlags - break - } - } - - // 자체 환경 변수 - customEnv := map[string]string{ - "HOUSTON_SIBLIING_INDEX": fmt.Sprintf("%d", index), - "HOUSTON_PROC_TIMESTAMP": time.Now().UTC().Format("2006-01-02T15-04-05"), - } - - // 프로세스 환경 변수에 반영 - meta.cmd.Env = os.Environ() - for k, v := range customEnv { - meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("%s=%s", k, v)) - } - - // argument 표현식 계산 - meta.cmd.Args, err = evaluateArgs(meta.cmd.Args, parseEnv(meta.cmd.Env)) - if err != nil { - logger.Println("evaluateArgs failed :", err) - return err - } - - // 로그파일에 환경변수 적용 - evalfile := os.Expand(meta.logfile, func(n string) string { - v := os.Getenv(n) - if len(v) == 0 { - return customEnv[n] - } - return v - }) - - if len(evalfile) > 0 { - evalfile = path.Join(logfolder, evalfile) + var evalfile string + if len(meta.logfile) > 0 { + evalfile = path.Join(logfolder, meta.logfile) } else { evalfile = logfolder + "/" } - go stdReader(meta.name, stdout, index, evalfile+".log") - go errReader(stderr, evalfile+".err") + go func() { + stdReader(stdout, evalfile+".log") + logger.Println("stdReader is terminated :", meta.name) + + if meta.isState(protos.ProcessState_Running) { + // state는 running인데 종료됐으면 exception처리 + hc.operationChan <- &protos.OperationQueryResponse{ + Operation: string(shared.Exception), + Args: map[string]string{ + "id": fmt.Sprintf("%d", meta.id), + }, + } + } + }() + + go stdReader(stderr, evalfile+".err") logger.Println("startChildProcess :", meta.cmd.Args) err = meta.cmd.Start() if err == nil { - logger.Println("process index, pid =", index, meta.cmd.Process.Pid) - set_affinity(meta.cmd.Process.Pid, index) meta.setState(protos.ProcessState_Running) }