diff --git a/client/client.go b/client/client.go index 695dc6d..e95ce76 100644 --- a/client/client.go +++ b/client/client.go @@ -71,41 +71,13 @@ type HoustonClient interface { Start() } -type bufferStack struct { - pool [5][]byte - cursor int32 -} - -func (bs *bufferStack) pop() []byte { - pos := atomic.LoadInt32(&bs.cursor) - for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) { - pos = atomic.LoadInt32(&bs.cursor) - } - - defer func() { - bs.pool[pos] = nil - }() - - curbuf := bs.pool[pos] - if curbuf == nil { - curbuf = make([]byte, 1024) - } - return curbuf -} - -func (bs *bufferStack) push(x []byte) { - pos := atomic.AddInt32(&bs.cursor, -1) - bs.pool[pos] = x -} - type procmeta struct { - cmd *exec.Cmd - name string - version string - state protos.ProcessState - stdin io.WriteCloser - logUploadChan chan *shared.UploadRequest - buffers bufferStack + cmd *exec.Cmd + name string + args []string + version string + state protos.ProcessState + stdin io.WriteCloser } type houstonClient struct { @@ -197,7 +169,7 @@ func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryReque for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ Name: child.name, - Args: child.cmd.Args, + Args: child.args, Version: child.version, State: child.state, Pid: int32(child.cmd.Process.Pid), @@ -327,7 +299,7 @@ func NewClient(standalone bool) (HoustonClient, error) { hc.startChildProcess(&shared.StartProcessRequest{ Version: proc.version, Name: proc.name, - Args: proc.cmd.Args, + Args: proc.args, }, op) } }(proc) diff --git a/client/operation.go b/client/operation.go index 78947c3..6920c57 100644 --- a/client/operation.go +++ b/client/operation.go @@ -4,6 +4,7 @@ package client import ( "archive/zip" + "bufio" "context" "encoding/json" "errors" @@ -14,7 +15,6 @@ import ( "os/exec" "path" "path/filepath" - "runtime/debug" "strings" "syscall" "time" @@ -183,18 +183,29 @@ func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) * stdin, _ := cmd.StdinPipe() return &procmeta{ - cmd: cmd, - name: req.Name, - version: req.Version, - state: protos.ProcessState_Stopped, - stdin: stdin, - logUploadChan: make(chan *shared.UploadRequest), - buffers: bufferStack{cursor: 0}, + cmd: cmd, + name: req.Name, + args: req.Args, + version: req.Version, + state: protos.ProcessState_Stopped, + stdin: stdin, } } return nil } +func makeLogFile(meta *procmeta, idx int) string { + now := time.Now().UTC() + ext := path.Ext(meta.args[0]) + nameonly := path.Base(meta.args[0]) + if len(ext) > 0 { + nameonly = nameonly[:len(nameonly)-len(ext)] + } + ts := now.Format("2006-01-02T15-04-05") + stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) + return fmt.Sprintf("%s_%d.log", stdPrefix, idx) +} + func (hc *houstonClient) launch(meta *procmeta) error { stdout, err := meta.cmd.StdoutPipe() if err != nil { @@ -210,104 +221,61 @@ func (hc *houstonClient) launch(meta *procmeta) error { return err } - relayChan := make(chan struct { - size int - buf []byte - }) - - go func() { + stdReader := func(r io.ReadCloser) { defer func() { - r := recover() - if r != nil { - logger.Println(r) - debug.PrintStack() + reco := recover() + if reco != nil { + logger.Println(reco) } - close(relayChan) - hc.exitChan <- meta.cmd }() - now := time.Now().UTC() - ext := path.Ext(meta.cmd.Args[0]) - nameonly := path.Base(meta.cmd.Args[0]) - if len(ext) > 0 { - nameonly = nameonly[:len(nameonly)-len(ext)] + defer r.Close() + + reader := bufio.NewReader(r) + thisFileSize := 0 + logFileIndex := 0 + + logFileName := makeLogFile(meta, logFileIndex) + targetFile, err := os.Create(logFileName) + if err != nil { + logger.Println("failed to create log file :", logFileName) + return } - ts := now.Format("2006-01-02T15-04-05") - stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) - logfile, _ := os.Create(stdPrefix + "_0.log") - defer logfile.Close() - logfileIdx := 0 - for { - thisFileSize := 0 - switchToNextFile := func() string { - logfileIdx++ - nextFile := fmt.Sprintf("%s_%d.log", stdPrefix, logfileIdx) - if nextLogfile, err := os.Create(nextFile); err == nil { - logfile.Close() - logfile = nextLogfile - } - thisFileSize = 0 - return nextFile - } - - uploadStartFile := "" - select { - case req := <-meta.logUploadChan: - nextFile := switchToNextFile() - startFile := uploadStartFile - uploadStartFile = nextFile - go func(startFile, nextFile string) { - zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req, startFile, nextFile) - if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { - if err = hc.uploadZipLogFile(zipFile, meta.name, meta.version); err == nil { - for _, oldf := range srcFiles { - os.Remove(oldf) - } - } else { - logger.Println("uploadZipLogFile failed :", err) - } - } else if err != nil { - logger.Println("zipLogFiles failed :", err) - } - }(startFile, nextFile) - - case bt := <-relayChan: - if bt.buf == nil { - return - } - logfile.Write(bt.buf[:bt.size]) - logfile.Sync() - meta.buffers.push(bt.buf) - thisFileSize += bt.size - if thisFileSize > 10*1024*1024 { - switchToNextFile() - } - } - } - }() - - stdReader := func(r io.Reader) { defer func() { - recover() - stdout.Close() + if targetFile != nil { + targetFile.Close() + } }() for { - buff := meta.buffers.pop() - size, err := r.Read(buff) + buff, err := reader.ReadBytes('\n') if err != nil { - relayChan <- struct { - size int - buf []byte - }{buf: nil} break } - if size > 0 { - relayChan <- struct { - size int - buf []byte - }{size: size, buf: buff} + + for written := 0; written < len(buff); { + n, err := targetFile.Write(buff) + if err != nil { + logger.Println("write log file failed :", logFileName, err) + break + } else { + written += n + thisFileSize += n + } + } + + if thisFileSize > 10*1024*1024 { + logFileIndex++ + logFileName := makeLogFile(meta, logFileIndex) + nextTargetFile, err := os.Create(logFileName) + if err != nil { + logger.Println("failed to create log file :", logFileName) + } else { + targetFile.Close() + targetFile = nextTargetFile + thisFileSize = 0 + } } } } @@ -349,7 +317,7 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest, op p vers := hc.deploys[req.Name] for _, ver := range vers { if ver.Version == req.Version { - ver.Args = meta.cmd.Args + ver.Args = meta.args } } @@ -476,7 +444,23 @@ func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { for _, child := range hc.childProcs { if child.version == req.Version && child.name == req.Name { logger.Println("uploadFiles found :", child.version, child.name) - child.logUploadChan <- req + + go func() { + zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req, "", "") + if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { + if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil { + // 마지막거 빼고 삭제 + for i := 0; i < len(srcFiles)-1; i++ { + os.Remove(srcFiles[i]) + } + } else { + logger.Println("uploadZipLogFile failed :", err) + } + } else if err != nil { + logger.Println("zipLogFiles failed :", err) + } + }() + return nil } } diff --git a/main_client.go b/main_client.go index 7849cec..dea1350 100644 --- a/main_client.go +++ b/main_client.go @@ -31,7 +31,6 @@ func main() { hc, err := client.NewClient(true) if err != nil { panic(err) - return } var ( diff --git a/main.go b/main_server.go similarity index 100% rename from main.go rename to main_server.go diff --git a/server/server.go b/server/server.go index 9fcb378..0b0df32 100644 --- a/server/server.go +++ b/server/server.go @@ -1,3 +1,5 @@ +//go:build !client + package server import (