로그 저장 로직 단순화

This commit is contained in:
2023-11-13 16:43:56 +09:00
parent 61d2fbf709
commit 394466e216
5 changed files with 89 additions and 132 deletions

View File

@ -71,41 +71,13 @@ type HoustonClient interface {
Start()
}
type bufferStack struct {
pool [5][]byte
cursor int32
}
func (bs *bufferStack) pop() []byte {
pos := atomic.LoadInt32(&bs.cursor)
for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) {
pos = atomic.LoadInt32(&bs.cursor)
}
defer func() {
bs.pool[pos] = nil
}()
curbuf := bs.pool[pos]
if curbuf == nil {
curbuf = make([]byte, 1024)
}
return curbuf
}
func (bs *bufferStack) push(x []byte) {
pos := atomic.AddInt32(&bs.cursor, -1)
bs.pool[pos] = x
}
type procmeta struct {
cmd *exec.Cmd
name string
version string
state protos.ProcessState
stdin io.WriteCloser
logUploadChan chan *shared.UploadRequest
buffers bufferStack
cmd *exec.Cmd
name string
args []string
version string
state protos.ProcessState
stdin io.WriteCloser
}
type houstonClient struct {
@ -197,7 +169,7 @@ func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryReque
for _, child := range hc.childProcs {
procs = append(procs, &protos.ProcessDescription{
Name: child.name,
Args: child.cmd.Args,
Args: child.args,
Version: child.version,
State: child.state,
Pid: int32(child.cmd.Process.Pid),
@ -327,7 +299,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
hc.startChildProcess(&shared.StartProcessRequest{
Version: proc.version,
Name: proc.name,
Args: proc.cmd.Args,
Args: proc.args,
}, op)
}
}(proc)

View File

@ -4,6 +4,7 @@ package client
import (
"archive/zip"
"bufio"
"context"
"encoding/json"
"errors"
@ -14,7 +15,6 @@ import (
"os/exec"
"path"
"path/filepath"
"runtime/debug"
"strings"
"syscall"
"time"
@ -183,18 +183,29 @@ func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *
stdin, _ := cmd.StdinPipe()
return &procmeta{
cmd: cmd,
name: req.Name,
version: req.Version,
state: protos.ProcessState_Stopped,
stdin: stdin,
logUploadChan: make(chan *shared.UploadRequest),
buffers: bufferStack{cursor: 0},
cmd: cmd,
name: req.Name,
args: req.Args,
version: req.Version,
state: protos.ProcessState_Stopped,
stdin: stdin,
}
}
return nil
}
func makeLogFile(meta *procmeta, idx int) string {
now := time.Now().UTC()
ext := path.Ext(meta.args[0])
nameonly := path.Base(meta.args[0])
if len(ext) > 0 {
nameonly = nameonly[:len(nameonly)-len(ext)]
}
ts := now.Format("2006-01-02T15-04-05")
stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
return fmt.Sprintf("%s_%d.log", stdPrefix, idx)
}
func (hc *houstonClient) launch(meta *procmeta) error {
stdout, err := meta.cmd.StdoutPipe()
if err != nil {
@ -210,104 +221,61 @@ func (hc *houstonClient) launch(meta *procmeta) error {
return err
}
relayChan := make(chan struct {
size int
buf []byte
})
go func() {
stdReader := func(r io.ReadCloser) {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
debug.PrintStack()
reco := recover()
if reco != nil {
logger.Println(reco)
}
close(relayChan)
hc.exitChan <- meta.cmd
}()
now := time.Now().UTC()
ext := path.Ext(meta.cmd.Args[0])
nameonly := path.Base(meta.cmd.Args[0])
if len(ext) > 0 {
nameonly = nameonly[:len(nameonly)-len(ext)]
defer r.Close()
reader := bufio.NewReader(r)
thisFileSize := 0
logFileIndex := 0
logFileName := makeLogFile(meta, logFileIndex)
targetFile, err := os.Create(logFileName)
if err != nil {
logger.Println("failed to create log file :", logFileName)
return
}
ts := now.Format("2006-01-02T15-04-05")
stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
logfile, _ := os.Create(stdPrefix + "_0.log")
defer logfile.Close()
logfileIdx := 0
for {
thisFileSize := 0
switchToNextFile := func() string {
logfileIdx++
nextFile := fmt.Sprintf("%s_%d.log", stdPrefix, logfileIdx)
if nextLogfile, err := os.Create(nextFile); err == nil {
logfile.Close()
logfile = nextLogfile
}
thisFileSize = 0
return nextFile
}
uploadStartFile := ""
select {
case req := <-meta.logUploadChan:
nextFile := switchToNextFile()
startFile := uploadStartFile
uploadStartFile = nextFile
go func(startFile, nextFile string) {
zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req, startFile, nextFile)
if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 {
if err = hc.uploadZipLogFile(zipFile, meta.name, meta.version); err == nil {
for _, oldf := range srcFiles {
os.Remove(oldf)
}
} else {
logger.Println("uploadZipLogFile failed :", err)
}
} else if err != nil {
logger.Println("zipLogFiles failed :", err)
}
}(startFile, nextFile)
case bt := <-relayChan:
if bt.buf == nil {
return
}
logfile.Write(bt.buf[:bt.size])
logfile.Sync()
meta.buffers.push(bt.buf)
thisFileSize += bt.size
if thisFileSize > 10*1024*1024 {
switchToNextFile()
}
}
}
}()
stdReader := func(r io.Reader) {
defer func() {
recover()
stdout.Close()
if targetFile != nil {
targetFile.Close()
}
}()
for {
buff := meta.buffers.pop()
size, err := r.Read(buff)
buff, err := reader.ReadBytes('\n')
if err != nil {
relayChan <- struct {
size int
buf []byte
}{buf: nil}
break
}
if size > 0 {
relayChan <- struct {
size int
buf []byte
}{size: size, buf: buff}
for written := 0; written < len(buff); {
n, err := targetFile.Write(buff)
if err != nil {
logger.Println("write log file failed :", logFileName, err)
break
} else {
written += n
thisFileSize += n
}
}
if thisFileSize > 10*1024*1024 {
logFileIndex++
logFileName := makeLogFile(meta, logFileIndex)
nextTargetFile, err := os.Create(logFileName)
if err != nil {
logger.Println("failed to create log file :", logFileName)
} else {
targetFile.Close()
targetFile = nextTargetFile
thisFileSize = 0
}
}
}
}
@ -349,7 +317,7 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest, op p
vers := hc.deploys[req.Name]
for _, ver := range vers {
if ver.Version == req.Version {
ver.Args = meta.cmd.Args
ver.Args = meta.args
}
}
@ -476,7 +444,23 @@ func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error {
for _, child := range hc.childProcs {
if child.version == req.Version && child.name == req.Name {
logger.Println("uploadFiles found :", child.version, child.name)
child.logUploadChan <- req
go func() {
zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req, "", "")
if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 {
if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil {
// 마지막거 빼고 삭제
for i := 0; i < len(srcFiles)-1; i++ {
os.Remove(srcFiles[i])
}
} else {
logger.Println("uploadZipLogFile failed :", err)
}
} else if err != nil {
logger.Println("zipLogFiles failed :", err)
}
}()
return nil
}
}