리팩토링

This commit is contained in:
2023-05-23 10:57:24 +09:00
parent 0f7b5c22fc
commit d3d451685f
4 changed files with 341 additions and 241 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
go-ayo/
*.log
*.exe

View File

@ -30,15 +30,41 @@ type HoustonClient interface {
Shutdown()
}
type bufferStack struct {
pool [5][]byte
cursor int32
}
func (bs *bufferStack) pop() []byte {
pos := atomic.LoadInt32(&bs.cursor)
for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) {
pos = atomic.LoadInt32(&bs.cursor)
}
defer func() {
bs.pool[pos] = nil
}()
curbuf := bs.pool[pos]
if curbuf == nil {
curbuf = make([]byte, 1024)
}
return curbuf
}
func (bs *bufferStack) push(x []byte) {
pos := atomic.AddInt32(&bs.cursor, -1)
bs.pool[pos] = x
}
type procmeta struct {
cmd *exec.Cmd
name string
version string
state protos.ProcessState
stdin io.WriteCloser
stdPrefix string
stdoutSize int32
stderrSize int32
cmd *exec.Cmd
name string
version string
state protos.ProcessState
stdin io.WriteCloser
logUploadChan chan *shared.UploadRequest
buffers bufferStack
}
type houstonClient struct {
@ -86,9 +112,9 @@ func gatherDeployedPrograms(name string) []*protos.VersionAndArgs {
}
}
sort.Slice(rawvers, func(i, j int) bool {
leftParsed := parseVersionString(rawvers[i].Version)
rightParsed := parseVersionString(rawvers[j].Version)
return compareVersionString(leftParsed, rightParsed) < 0
leftParsed := shared.ParseVersionString(rawvers[i].Version)
rightParsed := shared.ParseVersionString(rawvers[j].Version)
return shared.CompareVersionString(leftParsed, rightParsed) < 0
})
return rawvers
}
@ -98,13 +124,11 @@ func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryReque
procs := make([]*protos.ProcessDescription, 0, len(hc.childProcs))
for _, child := range hc.childProcs {
procs = append(procs, &protos.ProcessDescription{
Name: child.name,
Args: child.cmd.Args,
Version: child.version,
State: child.state,
Pid: int32(child.cmd.Process.Pid),
StdoutSize: atomic.LoadInt32(&child.stdoutSize),
StderrSize: atomic.LoadInt32(&child.stderrSize),
Name: child.name,
Args: child.cmd.Args,
Version: child.version,
State: child.state,
Pid: int32(child.cmd.Process.Pid),
})
}

View File

@ -2,22 +2,18 @@ package client
import (
"archive/zip"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync/atomic"
"runtime/debug"
"syscall"
"time"
@ -27,12 +23,6 @@ import (
"repositories.action2quare.com/ayo/houston/shared/protos"
)
type parsedVersionString = []string
func parseVersionString(ver string) parsedVersionString {
return strings.Split(ver, ".")
}
func lastExecutionArgs(verpath string) []string {
argf, err := os.Open(path.Join(verpath, "@args"))
if os.IsNotExist(err) {
@ -49,158 +39,259 @@ func lastExecutionArgs(verpath string) []string {
return out
}
func compareVersionString(lhs, rhs parsedVersionString) int {
minlen := len(lhs)
if minlen > len(rhs) {
minlen = len(rhs)
func (meta *procmeta) zipLogFiles(req *shared.UploadRequest, start, except string) (string, []string, error) {
root := path.Join(req.Name, req.Version)
matches, err := filepath.Glob(path.Join(root, req.Filter))
if err != nil {
return "", nil, err
}
for i := 0; i < minlen; i++ {
if len(lhs[i]) < len(rhs[i]) {
return -1
if len(matches) == 0 {
return "", nil, nil
}
root = path.Join(root, path.Dir(req.Filter))
// Create a file to write the archive to.
f, err := os.CreateTemp("", "")
if err != nil {
return "", nil, err
}
defer f.Close()
w := zip.NewWriter(f)
defer w.Close()
oldestFile := ""
for i, file := range matches {
file = filepath.ToSlash(file)
matches[i] = file
if file == root {
continue
}
if file >= except {
break
}
if len(start) > 0 && file < start {
continue
}
if len(oldestFile) == 0 {
oldestFile = path.Base(file)
}
if len(lhs[i]) > len(rhs[i]) {
return 1
relative := file[len(root)+1:]
fw, err := w.Create(relative)
if err != nil {
logger.Error(err)
return "", nil, err
}
if lhs[i] < rhs[i] {
return -1
src, err := os.Open(file)
if err != nil {
logger.Error(err)
return "", nil, err
}
defer src.Close()
if lhs[i] > rhs[i] {
return 1
if _, err = io.Copy(fw, src); err != nil {
logger.Error(err)
return "", nil, err
}
}
return len(lhs) - len(rhs)
return f.Name(), matches, nil
// defer func() {
// tempname := f.Name()
// f.Close()
// resp, _ := http.Post(req.Url, "application/zip", f)
// if resp != nil && resp.Body != nil {
// resp.Body.Close()
// }
// os.Remove(tempname)
// if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil {
// for _, file := range matches {
// if strings.HasSuffix(file, except) {
// continue
// }
// os.Remove(file)
// }
// }
// }()
// Create a new zip archive.
//}(f)
//return nil
}
func findLastestVersion(root string) (string, error) {
// 최신 버전을 찾음
entries, err := os.ReadDir(root)
if err != nil {
return "", err
}
if len(entries) == 0 {
return "", nil
}
var dironly []fs.DirEntry
for _, ent := range entries {
if ent.IsDir() {
dironly = append(dironly, ent)
func prepareProcessLaunch(req *shared.StartProcessRequest) *procmeta {
re := regexp.MustCompile(`[^\s"']+|"([^"]*)"|'([^']*)`)
args := re.FindAllString(req.Args, -1)
verpath := path.Join("./", req.Name, req.Version)
fi, err := os.Stat(verpath)
if err == nil && fi.IsDir() {
cmd := exec.Command("./"+args[0], args[1:]...)
cmd.Dir = verpath
stdin, _ := cmd.StdinPipe()
return &procmeta{
cmd: cmd,
name: req.Name,
version: req.Version,
state: protos.ProcessState_Stopped,
stdin: stdin,
logUploadChan: make(chan *shared.UploadRequest),
buffers: bufferStack{cursor: 0},
}
}
latest := parseVersionString(dironly[0].Name())
for i := 1; i < len(dironly); i++ {
next := parseVersionString(dironly[i].Name())
if compareVersionString(latest, next) < 0 {
latest = next
}
}
return strings.Join(latest, "."), nil
return nil
}
func (meta *procmeta) launch(args []string, exitChan chan<- *exec.Cmd) error {
exepath := args[0]
verpath := path.Dir(exepath)
args[0] = path.Base(exepath)
cmd := exec.Command("./"+args[0], args[1:]...)
cmd.Dir = verpath
stdin, err := cmd.StdinPipe()
func (hc *houstonClient) launch(meta *procmeta) error {
stdout, err := meta.cmd.StdoutPipe()
if err != nil {
return err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
stderr, err := meta.cmd.StderrPipe()
if err != nil {
return err
}
err = os.MkdirAll(path.Join(cmd.Dir, "logs"), os.ModePerm)
err = os.MkdirAll(path.Join(meta.cmd.Dir, "logs"), os.ModePerm)
if err != nil {
return err
}
now := time.Now().UTC()
ext := path.Ext(cmd.Args[0])
nameonly := path.Base(cmd.Args[0])
if len(ext) > 0 {
nameonly = nameonly[:len(nameonly)-len(ext)]
}
ts := now.Format("2006-01-02T15-04-05")
stdPrefix := path.Join(cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
errfile, err := os.Create(stdPrefix + ".stderr.log")
if err != nil {
return err
}
outfile, err := os.Create(stdPrefix + ".stdout.log")
if err != nil {
return err
}
relayChan := make(chan struct {
size int
buf []byte
})
go func() {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
debug.PrintStack()
}
close(relayChan)
hc.exitChan <- meta.cmd
}()
now := time.Now().UTC()
ext := path.Ext(meta.cmd.Args[0])
nameonly := path.Base(meta.cmd.Args[0])
if len(ext) > 0 {
nameonly = nameonly[:len(nameonly)-len(ext)]
}
ts := now.Format("2006-01-02T15-04-05")
stdPrefix := path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
logfile, _ := os.Create(stdPrefix + "_0.log")
defer logfile.Close()
logfileIdx := 0
for {
thisFileSize := 0
switchToNextFile := func() string {
logfileIdx++
nextFile := fmt.Sprintf("%s_%d.log", stdPrefix, logfileIdx)
if nextLogfile, err := os.Create(nextFile); err == nil {
logfile.Close()
logfile = nextLogfile
}
thisFileSize = 0
return nextFile
}
uploadStartFile := ""
select {
case req := <-meta.logUploadChan:
nextFile := switchToNextFile()
startFile := uploadStartFile
uploadStartFile = nextFile
go func(startFile, nextFile string) {
zipFile, srcFiles, err := meta.zipLogFiles(req, startFile, nextFile)
if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 {
zf, _ := os.Open(zipFile)
if zf != nil {
req, err := http.NewRequest("POST", hc.httpAddr+"/upload", zf)
if err != nil {
logger.Error(err)
}
req.Header.Set("Houston-Service-Name", meta.name)
req.Header.Set("Houston-Service-Version", meta.version)
req.Header.Set("Houston-Service-Filename", path.Base(srcFiles[0])+".zip")
req.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(req)
if err == nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
for _, oldf := range srcFiles {
os.Remove(oldf)
}
}
}
}
}
}(startFile, nextFile)
case bt := <-relayChan:
if bt.buf == nil {
return
}
logfile.Write(bt.buf[:bt.size])
meta.buffers.push(bt.buf)
thisFileSize += bt.size
if thisFileSize > 1024*1024 {
switchToNextFile()
}
}
}
}()
stdReader := func(r io.Reader) {
defer func() {
recover()
stdout.Close()
errfile.Close()
}()
buff := make([]byte, 1024)
for {
size, err := stderr.Read(buff)
buff := meta.buffers.pop()
size, err := r.Read(buff)
if err != nil {
exitChan <- cmd
errfile.Close()
relayChan <- struct {
size int
buf []byte
}{buf: nil}
break
}
errfile.Write(buff[:size])
atomic.AddInt32(&meta.stderrSize, int32(size))
}
}()
go func() {
defer func() {
recover()
stderr.Close()
outfile.Close()
}()
buff := make([]byte, 1024)
for {
size, err := stdout.Read(buff)
if err != nil {
exitChan <- cmd
break
if size > 0 {
relayChan <- struct {
size int
buf []byte
}{size: size, buf: buff}
}
outfile.Write(buff[:size])
atomic.AddInt32(&meta.stdoutSize, int32(size))
}
}()
err = cmd.Start()
if err != nil {
return err
}
meta.cmd = cmd
meta.stdin = stdin
meta.stdPrefix = stdPrefix
meta.state = protos.ProcessState_Running
go stdReader(stderr)
go stdReader(stdout)
return nil
err = meta.cmd.Start()
if err == nil {
meta.state = protos.ProcessState_Running
}
return err
}
func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) error {
logger.Println("startChildProcess :", *req)
if req.Version == "latest" {
// 최신 버전을 찾음
latest, err := findLastestVersion(path.Join("./", req.Name))
latest, err := shared.FindLastestVersion(path.Join("./", req.Name))
if err != nil {
return err
}
@ -208,51 +299,29 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) erro
req.Version = latest
}
meta := &procmeta{
name: req.Name,
version: req.Version,
state: protos.ProcessState_Error,
meta := prepareProcessLaunch(req)
if err := hc.launch(meta); err != nil {
return err
}
verpath := path.Join("./", req.Name, req.Version)
fi, err := os.Stat(verpath)
if err == nil && fi.IsDir() {
logger.Println("path found :", verpath)
// Define regular expression
re := regexp.MustCompile(`[^\s"']+|"([^"]*)"|'([^']*)`)
// Split input string into array of strings
result := re.FindAllString(req.Args, -1)
for i := range result {
result[i] = strings.Trim(result[i], "\"'")
}
result[0] = path.Join(verpath, result[0])
err := meta.launch(result, hc.exitChan)
if err != nil {
return err
}
// launch가 성공하면 args 저장. this and parent folder
if argfile, err := os.Create(path.Join(req.Name, "@args")); err == nil {
enc := json.NewEncoder(argfile)
enc.Encode(result)
argfile.Close()
}
if argfile, err := os.Create(path.Join(verpath, "@args")); err == nil {
enc := json.NewEncoder(argfile)
enc.Encode(result)
argfile.Close()
}
hc.childProcs = append(hc.childProcs, meta)
op := protos.NewOperationClient(hc.client)
op.Refresh(context.Background(), hc.makeOperationQueryRequest())
return nil
// launch가 성공하면 args 저장. this and parent folder
if argfile, err := os.Create(path.Join(req.Name, "@args")); err == nil {
enc := json.NewEncoder(argfile)
enc.Encode(meta.cmd.Args)
argfile.Close()
}
if argfile, err := os.Create(path.Join(req.Name, req.Version, "@args")); err == nil {
enc := json.NewEncoder(argfile)
enc.Encode(meta.cmd.Args)
argfile.Close()
}
return err
hc.childProcs = append(hc.childProcs, meta)
op := protos.NewOperationClient(hc.client)
op.Refresh(context.Background(), hc.makeOperationQueryRequest())
return nil
}
var errNoRunningProcess = errors.New("no running processed")
@ -260,7 +329,7 @@ var errNoRunningProcess = errors.New("no running processed")
func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error {
if req.Version == "latest" {
// 최신 버전을 찾음
latest, err := findLastestVersion(path.Join("./", req.Name))
latest, err := shared.FindLastestVersion(path.Join("./", req.Name))
if err != nil {
return err
}
@ -327,7 +396,7 @@ func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error
func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) error {
if req.Version == "latest" {
// 최신 버전을 찾음
latest, err := findLastestVersion(path.Join("./", req.Name))
latest, err := shared.FindLastestVersion(path.Join("./", req.Name))
if err != nil {
return err
}
@ -367,10 +436,7 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest)
op.Refresh(context.Background(), hc.makeOperationQueryRequest())
for _, proc := range restarts {
args := proc.cmd.Args
args[0] = path.Join(proc.cmd.Dir, args[0])
if err := proc.launch(args, hc.exitChan); err != nil {
if err := hc.launch(proc); err != nil {
return err
}
}
@ -382,7 +448,7 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest)
func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error {
if req.Version == "latest" {
// 최신 버전을 찾음
latest, err := findLastestVersion(path.Join("./", req.Name))
latest, err := shared.FindLastestVersion(path.Join("./", req.Name))
if err != nil {
return err
}
@ -390,69 +456,14 @@ func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error {
req.Version = latest
}
root := path.Join(req.Name, req.Version)
matches, err := filepath.Glob(path.Join(root, req.Filter))
if err != nil {
return err
}
if len(matches) == 0 {
resp, err := http.Post(req.Url, "application/zip", bytes.NewBuffer([]byte{}))
if err != nil {
return err
for _, child := range hc.childProcs {
if child.version == req.Version && child.name == req.Name {
child.logUploadChan <- req
break
}
resp.Body.Close()
return nil
}
// Create a file to write the archive to.
f, err := os.CreateTemp("", "")
if err != nil {
return err
}
go func(f *os.File) {
defer func() {
tempname := f.Name()
f.Close()
resp, _ := http.Post(req.Url, "application/zip", f)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
os.Remove(tempname)
if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil {
for _, f := range matches {
os.Remove(f)
}
}
}()
// Create a new zip archive.
w := zip.NewWriter(f)
defer w.Close()
for _, file := range matches {
relative := file[len(root)+1:]
fw, err := w.Create(relative)
if err != nil {
logger.Error(err)
return
}
src, err := os.Open(file)
if err != nil {
logger.Error(err)
return
}
defer src.Close()
if _, err = io.Copy(fw, src); err != nil {
logger.Error(err)
return
}
}
}(f)
// TODO : 실행 중이 아닌 폴더에서도 대상을 찾는다
// deploys
return nil
}

View File

@ -1,5 +1,11 @@
package shared
import (
"io/fs"
"os"
"strings"
)
type Operation string
const (
@ -48,3 +54,61 @@ type UploadRequest struct {
Filter string
DeleteAfterUploaded string // true, false
}
type ParsedVersionString = []string
func ParseVersionString(ver string) ParsedVersionString {
return strings.Split(ver, ".")
}
func CompareVersionString(lhs, rhs ParsedVersionString) int {
minlen := len(lhs)
if minlen > len(rhs) {
minlen = len(rhs)
}
for i := 0; i < minlen; i++ {
if len(lhs[i]) < len(rhs[i]) {
return -1
}
if len(lhs[i]) > len(rhs[i]) {
return 1
}
if lhs[i] < rhs[i] {
return -1
}
if lhs[i] > rhs[i] {
return 1
}
}
return len(lhs) - len(rhs)
}
func FindLastestVersion(root string) (string, error) {
// 최신 버전을 찾음
entries, err := os.ReadDir(root)
if err != nil {
return "", err
}
if len(entries) == 0 {
return "", nil
}
var dironly []fs.DirEntry
for _, ent := range entries {
if ent.IsDir() {
dironly = append(dironly, ent)
}
}
latest := ParseVersionString(dironly[0].Name())
for i := 1; i < len(dironly); i++ {
next := ParseVersionString(dironly[i].Name())
if CompareVersionString(latest, next) < 0 {
latest = next
}
}
return strings.Join(latest, "."), nil
}