diff --git a/.gitignore b/.gitignore index 175330e..d2603d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ - go-ayo/ +*.log +*.exe diff --git a/client/client.go b/client/client.go index e70efa0..84ed3ed 100644 --- a/client/client.go +++ b/client/client.go @@ -30,15 +30,41 @@ type HoustonClient interface { Shutdown() } +type bufferStack struct { + pool [5][]byte + cursor int32 +} + +func (bs *bufferStack) pop() []byte { + pos := atomic.LoadInt32(&bs.cursor) + for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) { + pos = atomic.LoadInt32(&bs.cursor) + } + + defer func() { + bs.pool[pos] = nil + }() + + curbuf := bs.pool[pos] + if curbuf == nil { + curbuf = make([]byte, 1024) + } + return curbuf +} + +func (bs *bufferStack) push(x []byte) { + pos := atomic.AddInt32(&bs.cursor, -1) + bs.pool[pos] = x +} + type procmeta struct { - cmd *exec.Cmd - name string - version string - state protos.ProcessState - stdin io.WriteCloser - stdPrefix string - stdoutSize int32 - stderrSize int32 + cmd *exec.Cmd + name string + version string + state protos.ProcessState + stdin io.WriteCloser + logUploadChan chan *shared.UploadRequest + buffers bufferStack } type houstonClient struct { @@ -86,9 +112,9 @@ func gatherDeployedPrograms(name string) []*protos.VersionAndArgs { } } sort.Slice(rawvers, func(i, j int) bool { - leftParsed := parseVersionString(rawvers[i].Version) - rightParsed := parseVersionString(rawvers[j].Version) - return compareVersionString(leftParsed, rightParsed) < 0 + leftParsed := shared.ParseVersionString(rawvers[i].Version) + rightParsed := shared.ParseVersionString(rawvers[j].Version) + return shared.CompareVersionString(leftParsed, rightParsed) < 0 }) return rawvers } @@ -98,13 +124,11 @@ func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryReque procs := make([]*protos.ProcessDescription, 0, len(hc.childProcs)) for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ - Name: child.name, - Args: child.cmd.Args, - Version: child.version, - State: child.state, - Pid: int32(child.cmd.Process.Pid), - StdoutSize: atomic.LoadInt32(&child.stdoutSize), - StderrSize: atomic.LoadInt32(&child.stderrSize), + Name: child.name, + Args: child.cmd.Args, + Version: child.version, + State: child.state, + Pid: int32(child.cmd.Process.Pid), }) } diff --git a/client/operation.go b/client/operation.go index 2cae0cf..9343c14 100644 --- a/client/operation.go +++ b/client/operation.go @@ -2,22 +2,18 @@ package client import ( "archive/zip" - "bytes" "context" "encoding/json" "errors" "fmt" "io" - "io/fs" "net/http" "os" "os/exec" "path" "path/filepath" "regexp" - "strconv" - "strings" - "sync/atomic" + "runtime/debug" "syscall" "time" @@ -27,12 +23,6 @@ import ( "repositories.action2quare.com/ayo/houston/shared/protos" ) -type parsedVersionString = []string - -func parseVersionString(ver string) parsedVersionString { - return strings.Split(ver, ".") -} - func lastExecutionArgs(verpath string) []string { argf, err := os.Open(path.Join(verpath, "@args")) if os.IsNotExist(err) { @@ -49,158 +39,259 @@ func lastExecutionArgs(verpath string) []string { return out } -func compareVersionString(lhs, rhs parsedVersionString) int { - minlen := len(lhs) - if minlen > len(rhs) { - minlen = len(rhs) +func (meta *procmeta) zipLogFiles(req *shared.UploadRequest, start, except string) (string, []string, error) { + root := path.Join(req.Name, req.Version) + matches, err := filepath.Glob(path.Join(root, req.Filter)) + if err != nil { + return "", nil, err } - for i := 0; i < minlen; i++ { - if len(lhs[i]) < len(rhs[i]) { - return -1 + if len(matches) == 0 { + return "", nil, nil + } + + root = path.Join(root, path.Dir(req.Filter)) + // Create a file to write the archive to. + f, err := os.CreateTemp("", "") + if err != nil { + return "", nil, err + } + defer f.Close() + + w := zip.NewWriter(f) + defer w.Close() + + oldestFile := "" + for i, file := range matches { + file = filepath.ToSlash(file) + matches[i] = file + if file == root { + continue + } + if file >= except { + break + } + if len(start) > 0 && file < start { + continue + } + if len(oldestFile) == 0 { + oldestFile = path.Base(file) } - if len(lhs[i]) > len(rhs[i]) { - return 1 + relative := file[len(root)+1:] + fw, err := w.Create(relative) + if err != nil { + logger.Error(err) + return "", nil, err } - if lhs[i] < rhs[i] { - return -1 + src, err := os.Open(file) + if err != nil { + logger.Error(err) + return "", nil, err } + defer src.Close() - if lhs[i] > rhs[i] { - return 1 + if _, err = io.Copy(fw, src); err != nil { + logger.Error(err) + return "", nil, err } } - return len(lhs) - len(rhs) + return f.Name(), matches, nil + // defer func() { + // tempname := f.Name() + // f.Close() + + // resp, _ := http.Post(req.Url, "application/zip", f) + // if resp != nil && resp.Body != nil { + // resp.Body.Close() + // } + // os.Remove(tempname) + // if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil { + // for _, file := range matches { + // if strings.HasSuffix(file, except) { + // continue + // } + // os.Remove(file) + // } + // } + // }() + + // Create a new zip archive. + + //}(f) + + //return nil } -func findLastestVersion(root string) (string, error) { - // 최신 버전을 찾음 - entries, err := os.ReadDir(root) - if err != nil { - return "", err - } - if len(entries) == 0 { - return "", nil - } - var dironly []fs.DirEntry - for _, ent := range entries { - if ent.IsDir() { - dironly = append(dironly, ent) +func prepareProcessLaunch(req *shared.StartProcessRequest) *procmeta { + re := regexp.MustCompile(`[^\s"']+|"([^"]*)"|'([^']*)`) + args := re.FindAllString(req.Args, -1) + + verpath := path.Join("./", req.Name, req.Version) + fi, err := os.Stat(verpath) + + if err == nil && fi.IsDir() { + cmd := exec.Command("./"+args[0], args[1:]...) + cmd.Dir = verpath + stdin, _ := cmd.StdinPipe() + + return &procmeta{ + cmd: cmd, + name: req.Name, + version: req.Version, + state: protos.ProcessState_Stopped, + stdin: stdin, + logUploadChan: make(chan *shared.UploadRequest), + buffers: bufferStack{cursor: 0}, } } - latest := parseVersionString(dironly[0].Name()) - for i := 1; i < len(dironly); i++ { - next := parseVersionString(dironly[i].Name()) - if compareVersionString(latest, next) < 0 { - latest = next - } - } - return strings.Join(latest, "."), nil + return nil } -func (meta *procmeta) launch(args []string, exitChan chan<- *exec.Cmd) error { - exepath := args[0] - verpath := path.Dir(exepath) - args[0] = path.Base(exepath) - - cmd := exec.Command("./"+args[0], args[1:]...) - cmd.Dir = verpath - - stdin, err := cmd.StdinPipe() +func (hc *houstonClient) launch(meta *procmeta) error { + stdout, err := meta.cmd.StdoutPipe() if err != nil { return err } - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - stderr, err := cmd.StderrPipe() + stderr, err := meta.cmd.StderrPipe() if err != nil { return err } - err = os.MkdirAll(path.Join(cmd.Dir, "logs"), os.ModePerm) + err = os.MkdirAll(path.Join(meta.cmd.Dir, "logs"), os.ModePerm) if err != nil { return err } - now := time.Now().UTC() - ext := path.Ext(cmd.Args[0]) - nameonly := path.Base(cmd.Args[0]) - if len(ext) > 0 { - nameonly = nameonly[:len(nameonly)-len(ext)] - } - ts := now.Format("2006-01-02T15-04-05") - stdPrefix := path.Join(cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) - errfile, err := os.Create(stdPrefix + ".stderr.log") - if err != nil { - return err - } - outfile, err := os.Create(stdPrefix + ".stdout.log") - if err != nil { - return err - } + relayChan := make(chan struct { + size int + buf []byte + }) go func() { + defer func() { + r := recover() + if r != nil { + logger.Println(r) + debug.PrintStack() + } + close(relayChan) + hc.exitChan <- meta.cmd + }() + + now := time.Now().UTC() + ext := path.Ext(meta.cmd.Args[0]) + nameonly := path.Base(meta.cmd.Args[0]) + if len(ext) > 0 { + nameonly = nameonly[:len(nameonly)-len(ext)] + } + ts := now.Format("2006-01-02T15-04-05") + stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) + logfile, _ := os.Create(stdPrefix + "_0.log") + defer logfile.Close() + + logfileIdx := 0 + for { + thisFileSize := 0 + switchToNextFile := func() string { + logfileIdx++ + nextFile := fmt.Sprintf("%s_%d.log", stdPrefix, logfileIdx) + if nextLogfile, err := os.Create(nextFile); err == nil { + logfile.Close() + logfile = nextLogfile + } + thisFileSize = 0 + return nextFile + } + + uploadStartFile := "" + select { + case req := <-meta.logUploadChan: + nextFile := switchToNextFile() + startFile := uploadStartFile + uploadStartFile = nextFile + go func(startFile, nextFile string) { + zipFile, srcFiles, err := meta.zipLogFiles(req, startFile, nextFile) + if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { + zf, _ := os.Open(zipFile) + if zf != nil { + req, err := http.NewRequest("POST", hc.httpAddr+"/upload", zf) + if err != nil { + logger.Error(err) + } + req.Header.Set("Houston-Service-Name", meta.name) + req.Header.Set("Houston-Service-Version", meta.version) + req.Header.Set("Houston-Service-Filename", path.Base(srcFiles[0])+".zip") + req.Header.Set("Content-Type", "application/zip") + resp, err := http.DefaultClient.Do(req) + if err == nil { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + for _, oldf := range srcFiles { + os.Remove(oldf) + } + } + } + } + } + }(startFile, nextFile) + + case bt := <-relayChan: + if bt.buf == nil { + return + } + logfile.Write(bt.buf[:bt.size]) + meta.buffers.push(bt.buf) + thisFileSize += bt.size + if thisFileSize > 1024*1024 { + switchToNextFile() + } + } + } + }() + + stdReader := func(r io.Reader) { defer func() { recover() stdout.Close() - errfile.Close() }() - buff := make([]byte, 1024) for { - size, err := stderr.Read(buff) + buff := meta.buffers.pop() + size, err := r.Read(buff) if err != nil { - exitChan <- cmd - errfile.Close() + relayChan <- struct { + size int + buf []byte + }{buf: nil} break } - errfile.Write(buff[:size]) - atomic.AddInt32(&meta.stderrSize, int32(size)) - } - }() - - go func() { - defer func() { - recover() - stderr.Close() - outfile.Close() - }() - - buff := make([]byte, 1024) - for { - size, err := stdout.Read(buff) - if err != nil { - exitChan <- cmd - break + if size > 0 { + relayChan <- struct { + size int + buf []byte + }{size: size, buf: buff} } - outfile.Write(buff[:size]) - atomic.AddInt32(&meta.stdoutSize, int32(size)) } - }() - - err = cmd.Start() - if err != nil { - return err } - meta.cmd = cmd - meta.stdin = stdin - meta.stdPrefix = stdPrefix - meta.state = protos.ProcessState_Running + go stdReader(stderr) + go stdReader(stdout) - return nil + err = meta.cmd.Start() + if err == nil { + meta.state = protos.ProcessState_Running + } + return err } func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) error { logger.Println("startChildProcess :", *req) if req.Version == "latest" { // 최신 버전을 찾음 - latest, err := findLastestVersion(path.Join("./", req.Name)) + latest, err := shared.FindLastestVersion(path.Join("./", req.Name)) if err != nil { return err } @@ -208,51 +299,29 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) erro req.Version = latest } - meta := &procmeta{ - name: req.Name, - version: req.Version, - state: protos.ProcessState_Error, + meta := prepareProcessLaunch(req) + if err := hc.launch(meta); err != nil { + return err } - verpath := path.Join("./", req.Name, req.Version) - fi, err := os.Stat(verpath) - if err == nil && fi.IsDir() { - logger.Println("path found :", verpath) - // Define regular expression - re := regexp.MustCompile(`[^\s"']+|"([^"]*)"|'([^']*)`) - - // Split input string into array of strings - result := re.FindAllString(req.Args, -1) - for i := range result { - result[i] = strings.Trim(result[i], "\"'") - } - result[0] = path.Join(verpath, result[0]) - err := meta.launch(result, hc.exitChan) - if err != nil { - return err - } - - // launch가 성공하면 args 저장. this and parent folder - if argfile, err := os.Create(path.Join(req.Name, "@args")); err == nil { - enc := json.NewEncoder(argfile) - enc.Encode(result) - argfile.Close() - } - if argfile, err := os.Create(path.Join(verpath, "@args")); err == nil { - enc := json.NewEncoder(argfile) - enc.Encode(result) - argfile.Close() - } - - hc.childProcs = append(hc.childProcs, meta) - - op := protos.NewOperationClient(hc.client) - op.Refresh(context.Background(), hc.makeOperationQueryRequest()) - - return nil + // launch가 성공하면 args 저장. this and parent folder + if argfile, err := os.Create(path.Join(req.Name, "@args")); err == nil { + enc := json.NewEncoder(argfile) + enc.Encode(meta.cmd.Args) + argfile.Close() + } + if argfile, err := os.Create(path.Join(req.Name, req.Version, "@args")); err == nil { + enc := json.NewEncoder(argfile) + enc.Encode(meta.cmd.Args) + argfile.Close() } - return err + hc.childProcs = append(hc.childProcs, meta) + + op := protos.NewOperationClient(hc.client) + op.Refresh(context.Background(), hc.makeOperationQueryRequest()) + + return nil } var errNoRunningProcess = errors.New("no running processed") @@ -260,7 +329,7 @@ var errNoRunningProcess = errors.New("no running processed") func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error { if req.Version == "latest" { // 최신 버전을 찾음 - latest, err := findLastestVersion(path.Join("./", req.Name)) + latest, err := shared.FindLastestVersion(path.Join("./", req.Name)) if err != nil { return err } @@ -327,7 +396,7 @@ func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) error { if req.Version == "latest" { // 최신 버전을 찾음 - latest, err := findLastestVersion(path.Join("./", req.Name)) + latest, err := shared.FindLastestVersion(path.Join("./", req.Name)) if err != nil { return err } @@ -367,10 +436,7 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) for _, proc := range restarts { - args := proc.cmd.Args - args[0] = path.Join(proc.cmd.Dir, args[0]) - - if err := proc.launch(args, hc.exitChan); err != nil { + if err := hc.launch(proc); err != nil { return err } } @@ -382,7 +448,7 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { if req.Version == "latest" { // 최신 버전을 찾음 - latest, err := findLastestVersion(path.Join("./", req.Name)) + latest, err := shared.FindLastestVersion(path.Join("./", req.Name)) if err != nil { return err } @@ -390,69 +456,14 @@ func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { req.Version = latest } - root := path.Join(req.Name, req.Version) - matches, err := filepath.Glob(path.Join(root, req.Filter)) - if err != nil { - return err - } - - if len(matches) == 0 { - resp, err := http.Post(req.Url, "application/zip", bytes.NewBuffer([]byte{})) - if err != nil { - return err + for _, child := range hc.childProcs { + if child.version == req.Version && child.name == req.Name { + child.logUploadChan <- req + break } - resp.Body.Close() - return nil } - // Create a file to write the archive to. - f, err := os.CreateTemp("", "") - if err != nil { - return err - } - go func(f *os.File) { - defer func() { - tempname := f.Name() - f.Close() - - resp, _ := http.Post(req.Url, "application/zip", f) - if resp != nil && resp.Body != nil { - resp.Body.Close() - } - os.Remove(tempname) - if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil { - for _, f := range matches { - os.Remove(f) - } - } - }() - - // Create a new zip archive. - w := zip.NewWriter(f) - defer w.Close() - - for _, file := range matches { - relative := file[len(root)+1:] - fw, err := w.Create(relative) - if err != nil { - logger.Error(err) - return - } - - src, err := os.Open(file) - if err != nil { - logger.Error(err) - return - } - defer src.Close() - - if _, err = io.Copy(fw, src); err != nil { - logger.Error(err) - return - } - - } - }(f) - + // TODO : 실행 중이 아닌 폴더에서도 대상을 찾는다 + // deploys return nil } diff --git a/shared/operator.go b/shared/operator.go index 8f5178c..661f7a4 100644 --- a/shared/operator.go +++ b/shared/operator.go @@ -1,5 +1,11 @@ package shared +import ( + "io/fs" + "os" + "strings" +) + type Operation string const ( @@ -48,3 +54,61 @@ type UploadRequest struct { Filter string DeleteAfterUploaded string // true, false } + +type ParsedVersionString = []string + +func ParseVersionString(ver string) ParsedVersionString { + return strings.Split(ver, ".") +} + +func CompareVersionString(lhs, rhs ParsedVersionString) int { + minlen := len(lhs) + if minlen > len(rhs) { + minlen = len(rhs) + } + + for i := 0; i < minlen; i++ { + if len(lhs[i]) < len(rhs[i]) { + return -1 + } + + if len(lhs[i]) > len(rhs[i]) { + return 1 + } + + if lhs[i] < rhs[i] { + return -1 + } + + if lhs[i] > rhs[i] { + return 1 + } + } + + return len(lhs) - len(rhs) +} + +func FindLastestVersion(root string) (string, error) { + // 최신 버전을 찾음 + entries, err := os.ReadDir(root) + if err != nil { + return "", err + } + if len(entries) == 0 { + return "", nil + } + var dironly []fs.DirEntry + for _, ent := range entries { + if ent.IsDir() { + dironly = append(dironly, ent) + } + } + latest := ParseVersionString(dironly[0].Name()) + for i := 1; i < len(dironly); i++ { + next := ParseVersionString(dironly[i].Name()) + if CompareVersionString(latest, next) < 0 { + latest = next + } + } + return strings.Join(latest, "."), nil +}