로그 업로드 로직 개선

This commit is contained in:
2024-11-14 10:48:30 +09:00
parent 46dd289c28
commit df56a542a1

View File

@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"path"
@ -22,7 +21,6 @@ import (
"github.com/Knetic/govaluate"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/metric"
"repositories.action2quare.com/ayo/houston/shared"
"repositories.action2quare.com/ayo/houston/shared/protos"
)
@ -269,31 +267,75 @@ func (hc *houstonClient) launch(meta *procmeta) error {
return err
}
stdReader := func(jobName string, r io.ReadCloser, index int, logfilePath string) {
defer func() {
logger.Println("stdReader is terminated :", meta.name)
if meta.isState(protos.ProcessState_Running) {
hc.operationChan <- &protos.OperationQueryResponse{
Operation: string(shared.Exception),
Args: map[string]string{
"id": fmt.Sprintf("%d", meta.id),
},
logUploader := func(localctx context.Context, logfilePath string, logChan chan []byte) {
var logFile *os.File
var logFilePath string
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
if len(head) > 0 && !strings.HasSuffix(head, "/") {
head += "."
}
writeLog := func(log []byte) {
if logFile == nil {
logFilePath = head + time.Now().UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
for written := 0; written < len(log); {
n, err := logFile.Write(log[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
}
defer func() {
if logFile != nil {
logFile.Close()
logFile = nil
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
}()
defer func() {
overflow := index / 64
offset := index % 64
key := fmt.Sprintf("%s-%d", meta.args[0], overflow)
runningFlags := hc.siblingProcIndex[key]
mask := uint64(1 << offset)
runningFlags = runningFlags ^ mask
hc.siblingProcIndex[key] = runningFlags
for {
select {
case log := <-logChan:
writeLog(log)
default:
// logChan에 있는 모든 로그 소비
return
}
}
}()
metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace)
for {
heartbeat := time.After(time.Minute)
select {
case <-localctx.Done():
return
case <-heartbeat:
heartbeat = time.After(time.Minute)
// 지금까지의 로그를 저장해서 업로드
if logFile != nil {
logFile.Close()
logFile = nil
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
case log := <-logChan:
writeLog(log)
}
}
}
stdReader := func(r io.ReadCloser, logfilePath string) {
defer func() {
reco := recover()
if reco != nil {
@ -302,234 +344,54 @@ func (hc *houstonClient) launch(meta *procmeta) error {
r.Close()
}()
total := 0
var logFile *os.File
var logFilePath string
var logFileTimestamp time.Time
localctx, cancel := context.WithCancel(context.Background())
defer cancel()
logChan := make(chan []byte, 1)
go logUploader(localctx, logfilePath, logChan)
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
if len(head) > 0 && !strings.HasSuffix(head, "/") {
head += "."
}
reader := bufio.NewReader(r)
readingMetric := false
var metricBuffer []byte
wipeLogFile := func() {
if logFile != nil {
logFile.Close()
logFile = nil
if total > 0 {
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
}
total = 0
}
defer func() {
wipeLogFile()
metricExporter.Shutdown()
}()
for {
if time.Since(logFileTimestamp) > time.Minute {
wipeLogFile()
}
if logFile == nil {
logFileTimestamp = time.Now()
logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, err := reader.ReadBytes('\n')
if err != nil {
logger.Println("ReadBytes at stdReader return err :", err, meta.name)
break
}
if readingMetric {
metricBuffer = append(metricBuffer, buff...)
} else if buff[0] == metric.METRIC_HEAD_INLINE {
readingMetric = true
metricBuffer = append(metricBuffer, buff[1:]...)
}
if readingMetric {
if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE {
readingMetric = false
metricBuffer = metricBuffer[:len(metricBuffer)-2]
if metricBuffer[0] == '{' {
var desc metric.MetricDescription
if err := json.Unmarshal(metricBuffer, &desc); err != nil {
logger.Println("unmarshal metric failed :", err, string(metricBuffer))
continue
}
if desc.ConstLabels == nil {
desc.ConstLabels = make(map[string]string)
}
for k, v := range hc.config.ConstLabels {
desc.ConstLabels[k] = v
}
desc.ConstLabels["job"] = jobName
metricExporter.RegisterMetric(&desc)
} else {
key, val := metric.ReadMetricValue(metricBuffer)
metricExporter.UpdateMetric(key, val)
}
metricBuffer = metricBuffer[:0]
}
} else if logFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := logFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
total += len(buff)
if len(buff) > 0 {
logChan <- buff
}
}
}
errReader := func(r io.ReadCloser, logfilePath string) {
defer func() {
reco := recover()
if reco != nil {
logger.Println(reco)
}
}()
defer r.Close()
total := 0
var logFile *os.File
var logFilePath string
var logFileTimestamp time.Time
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
if len(head) > 0 && !strings.HasSuffix(head, "/") {
head += "."
}
reader := bufio.NewReader(r)
wipeLogFile := func() {
if logFile != nil {
logFile.Close()
logFile = nil
if total > 0 {
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
}
total = 0
}
defer wipeLogFile()
for {
if time.Since(logFileTimestamp) > time.Minute {
wipeLogFile()
}
if logFile == nil {
logFileTimestamp = time.Now()
logFilePath = head + logFileTimestamp.UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, errRead := reader.ReadBytes('\n')
if errRead != nil {
logger.Println("ReadBytes at stdReader return err :", err, meta.name)
break
}
if logFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := logFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
total += len(buff)
}
}
}
index := 0
for overflow := 0; ; overflow++ {
key := fmt.Sprintf("%s-%d", meta.args[0], overflow)
runningFlags := hc.siblingProcIndex[key]
if runningFlags == math.MaxUint64 {
index += 64
} else {
for si := 0; si < 64; si++ {
mask := uint64(1 << si)
if runningFlags&mask == 0 {
index += si
runningFlags |= mask
break
}
}
hc.siblingProcIndex[key] = runningFlags
break
}
}
// 자체 환경 변수
customEnv := map[string]string{
"HOUSTON_SIBLIING_INDEX": fmt.Sprintf("%d", index),
"HOUSTON_PROC_TIMESTAMP": time.Now().UTC().Format("2006-01-02T15-04-05"),
}
// 프로세스 환경 변수에 반영
meta.cmd.Env = os.Environ()
for k, v := range customEnv {
meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
// argument 표현식 계산
meta.cmd.Args, err = evaluateArgs(meta.cmd.Args, parseEnv(meta.cmd.Env))
if err != nil {
logger.Println("evaluateArgs failed :", err)
return err
}
// 로그파일에 환경변수 적용
evalfile := os.Expand(meta.logfile, func(n string) string {
v := os.Getenv(n)
if len(v) == 0 {
return customEnv[n]
}
return v
})
if len(evalfile) > 0 {
evalfile = path.Join(logfolder, evalfile)
var evalfile string
if len(meta.logfile) > 0 {
evalfile = path.Join(logfolder, meta.logfile)
} else {
evalfile = logfolder + "/"
}
go stdReader(meta.name, stdout, index, evalfile+".log")
go errReader(stderr, evalfile+".err")
go func() {
stdReader(stdout, evalfile+".log")
logger.Println("stdReader is terminated :", meta.name)
if meta.isState(protos.ProcessState_Running) {
// state는 running인데 종료됐으면 exception처리
hc.operationChan <- &protos.OperationQueryResponse{
Operation: string(shared.Exception),
Args: map[string]string{
"id": fmt.Sprintf("%d", meta.id),
},
}
}
}()
go stdReader(stderr, evalfile+".err")
logger.Println("startChildProcess :", meta.cmd.Args)
err = meta.cmd.Start()
if err == nil {
logger.Println("process index, pid =", index, meta.cmd.Process.Pid)
set_affinity(meta.cmd.Process.Pid, index)
meta.setState(protos.ProcessState_Running)
}