package client import ( "context" "encoding/json" "errors" "fmt" "io" "io/fs" "os" "os/exec" "os/signal" "path" "path/filepath" "reflect" "sort" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "unsafe" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type runcommand struct { Exec string `json:"exec"` Args []string `json:"args"` Version string `json:"version"` AutoRestart bool `json:"auto_restart"` OutputLogFile string `json:"logfile"` } type easyruncommand runcommand func (t *runcommand) UnmarshalJSON(b []byte) error { easy := easyruncommand{ Version: "latest", AutoRestart: true, } if err := json.Unmarshal(b, &easy); err != nil { return err } *t = runcommand(easy) return nil } type clientConfig struct { GrpcAddress string `json:"grpc_server_address"` HttpAddress string `json:"http_server_address"` StorageRoot string `json:"storage_path"` MetricNamespace string `json:"metric_namespace"` ConstLabels map[string]string `json:"metric_const_labels"` Autorun map[string]runcommand `json:"autorun"` } var autorun = flagx.String("autorun", "", "") type outerconfig struct { Houston *struct { Client clientConfig `json:"client"` } `json:"houston"` } func loadClientConfig() (clientConfig, error) { var oc outerconfig err := gocommon.LoadConfig[outerconfig](&oc) if err != nil { logger.Println(err) return clientConfig{}, err } return oc.Houston.Client, nil } type HoustonClient interface { Shutdown() Start() } var seq = int32(1) type procmeta struct { id int32 cmd *exec.Cmd name string args []string version string verpath string recover bool state int32 stdin io.WriteCloser logfile string keepLatest bool } func (pm *procmeta) isState(s protos.ProcessState) bool { return atomic.LoadInt32(&pm.state) == int32(s) } func (pm *procmeta) getState() protos.ProcessState { return protos.ProcessState(atomic.LoadInt32(&pm.state)) } func (pm *procmeta) setState(s protos.ProcessState) { atomic.StoreInt32(&pm.state, int32(s)) } type houstonClient struct { childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 deploys map[string][]*protos.VersionAndArgs shutdownFunc context.CancelFunc ctx context.Context operationChan chan *protos.OperationQueryResponse exitChan chan *exec.Cmd clientChan chan *grpc.ClientConn timestamp string wg sync.WaitGroup config clientConfig version string standalone bool siblingProcIndex map[string]uint64 } func unmarshal[T any](val *T, src map[string]string) { defer func() { r := recover() if r != nil { logger.Error(r) } }() argval := reflect.ValueOf(val) for i := 0; i < argval.Elem().Type().NumField(); i++ { if !argval.Elem().Type().Field(i).IsExported() { continue } arg := src[argval.Elem().Type().Field(i).Name] if argval.Elem().Field(i).CanInt() { num, _ := strconv.ParseInt(arg, 10, 0) argval.Elem().Field(i).SetInt(num) } else if argval.Elem().Field(i).Kind() == reflect.Array || argval.Elem().Field(i).Kind() == reflect.Slice { conv := strings.Split(arg, "\n") argval.Elem().Field(i).Set(reflect.ValueOf(conv)) } else if argval.Elem().Field(i).Kind() == reflect.Bool { bv, _ := strconv.ParseBool(arg) argval.Elem().Field(i).SetBool(bv) } else { argval.Elem().Field(i).SetString(arg) } } logger.Println("operation receive :", argval.Elem().Type().Name(), *val) } type version_args_ts struct { *protos.VersionAndArgs modTime time.Time } func gatherDeployedPrograms(storageRoot, name string) (out []*protos.VersionAndArgs) { var rawvers []version_args_ts targetPath := path.Join(storageRoot, name) if vers, err := os.ReadDir(targetPath); err == nil { for _, ver := range vers { if ver.IsDir() { fi, _ := ver.Info() args := lastExecutionArgs(path.Join(targetPath, ver.Name())) rawvers = append(rawvers, version_args_ts{ VersionAndArgs: &protos.VersionAndArgs{ Version: ver.Name(), Args: args, }, modTime: fi.ModTime(), }) } } } sort.Slice(rawvers, func(i, j int) bool { return rawvers[i].modTime.After(rawvers[j].modTime) }) for _, v := range rawvers { out = append(out, v.VersionAndArgs) } return } func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest { var procs []*protos.ProcessDescription var deploys []*protos.DeployedVersions var selfname string var selfargs []string if hc.standalone { selfname = path.Base(filepath.ToSlash(os.Args[0])) selfargs = os.Args[1:] } else { selfname = "houston" selfargs = []string{} } if len(path.Ext(selfname)) > 0 { selfname = selfname[:len(selfname)-len(path.Ext(selfname))] } procs = append(procs, &protos.ProcessDescription{ Name: selfname, Args: selfargs, Version: hc.version, State: protos.ProcessState_Running, Pid: int32(os.Getpid()), }) deploys = append(deploys, &protos.DeployedVersions{ Name: selfname, Versions: []*protos.VersionAndArgs{ {Version: hc.version, Args: selfargs}, }, }) for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ Name: child.name, Args: child.cmd.Args, Version: child.version, State: child.getState(), Pid: int32(child.cmd.Process.Pid), }) } for name, prog := range hc.deploys { deploys = append(deploys, &protos.DeployedVersions{ Name: name, Versions: prog, }) } hn, _ := os.Hostname() return &protos.OperationQueryRequest{ Hostname: hn, PublicIp: os.Getenv("PUBIP"), PrivateIp: os.Getenv("PRVIP"), Procs: procs, Deploys: deploys, } } func NewClient(standalone bool) (HoustonClient, error) { clientConfig, err := loadClientConfig() if err != nil { return nil, err } if len(clientConfig.GrpcAddress) == 0 { return nil, errors.New("client.grpc_server_address is missing") } if len(clientConfig.HttpAddress) == 0 { return nil, errors.New("client.http_server_address is missing") } exefile, err := os.Executable() if err != nil { return nil, err } exefi, err := os.Stat(exefile) if err != nil { return nil, err } sp, err := os.Stat(clientConfig.StorageRoot) if err != nil { if errors.Is(err, fs.ErrNotExist) { err = os.MkdirAll(clientConfig.StorageRoot, 0775) } } else if !sp.IsDir() { err = errors.New(clientConfig.StorageRoot + " is not directory") } if err != nil { return nil, err } deploys := make(map[string][]*protos.VersionAndArgs) if dirs, err := os.ReadDir(clientConfig.StorageRoot); err == nil { for _, dir := range dirs { if dir.IsDir() { flagf := path.Join(clientConfig.StorageRoot, dir.Name(), "@houston") if _, err := os.Stat(flagf); !os.IsNotExist(err) { deploys[dir.Name()] = gatherDeployedPrograms(clientConfig.StorageRoot, dir.Name()) } } } } ver, _ := os.ReadFile("@version") if len(ver) == 0 { ver = []byte("0.0.0") } hc := &houstonClient{ config: clientConfig, clientChan: make(chan *grpc.ClientConn), extraMetrics: unsafe.Pointer(&map[string]float32{}), deploys: deploys, timestamp: exefi.ModTime().String(), version: string(ver), standalone: standalone, siblingProcIndex: make(map[string]uint64), } ctx, cancel := context.WithCancel(context.Background()) exitChan := make(chan *exec.Cmd, 10) operationChan := make(chan *protos.OperationQueryResponse, 10) hc.wg.Add(1) // autorun 처리 go func() { defer hc.wg.Done() // 메인 operator var op protos.OperationClient myname, _ := os.Executable() myname = path.Base(filepath.ToSlash(myname)) if len(path.Ext(myname)) > 0 { myname = myname[:len(myname)-len(path.Ext(myname))] } if myname == "__debug_bin" { myname = "houston" } for { select { case <-ctx.Done(): return case newClient := <-hc.clientChan: op = protos.NewOperationClient(newClient) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) case exited := <-exitChan: var newprocs []*procmeta for _, proc := range hc.childProcs { if proc.cmd == exited { if proc.isState(protos.ProcessState_Running) || proc.isState(protos.ProcessState_Restart) { go func(proc *procmeta) { if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) } proc.cmd.Wait() proc.cmd.Process.Release() if proc.isState(protos.ProcessState_Restart) { if proc.keepLatest { proc.version = "latest" } if err := hc.startChildProcess(&shared.StartProcessRequest{ Version: proc.version, Name: proc.name, Args: proc.args, }); err != nil { logger.ErrorWithCallStack(err) } else { op.Refresh(context.Background(), hc.makeOperationQueryRequest()) } } }(proc) } } else { newprocs = append(newprocs, proc) } } hc.childProcs = newprocs op.Refresh(ctx, hc.makeOperationQueryRequest()) case resp := <-operationChan: logger.Println("houton query operation :", resp.Operation) switch shared.Operation(resp.Operation) { case shared.Deploy: var dr shared.DeployRequest unmarshal(&dr, resp.Args) logger.Println("args :", dr) if dr.Name == myname { if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil { args := []string{ fmt.Sprintf("%d", os.Getpid()), srcdir, filepath.ToSlash(os.Args[0]), } args = append(args, os.Args[1:]...) cmd := exec.Command(replacer, args...) if err := cmd.Start(); err != nil { logger.Println(err) } else { hc.shutdownFunc() } } else { logger.Println(err) } } else { hn, _ := os.Hostname() if err := hc.deploy(&dr, func(dp *protos.DeployingProgress) { dp.Hostname = hn dp.Name = dr.Name dp.Version = dr.Version op.ReportDeployingProgress(ctx, dp) }); err == nil { prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name) hc.deploys[dr.Name] = prog op.Refresh(ctx, hc.makeOperationQueryRequest()) op.ReportDeployingProgress(ctx, &protos.DeployingProgress{ Hostname: hn, Name: dr.Name, Version: dr.Version, State: "success", Progress: 0, Total: 0, }) } else { logger.Println(err) op.ReportDeployingProgress(ctx, &protos.DeployingProgress{ Hostname: hn, Name: dr.Name, Version: dr.Version, State: "fail:" + err.Error(), Progress: 0, Total: 0, }) } } case shared.Withdraw: var wr shared.WithdrawRequest unmarshal(&wr, resp.Args) logger.Println("args :", wr) err := hc.withdraw(&wr) if err == nil { prog := gatherDeployedPrograms(hc.config.StorageRoot, wr.Name) if len(prog) == 0 { delete(hc.deploys, wr.Name) } else { hc.deploys[wr.Name] = prog } op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } case shared.Start: var sr shared.StartProcessRequest unmarshal(&sr, resp.Args) logger.Println("args :", sr) if err := hc.startChildProcess(&sr); err != nil { logger.ErrorWithCallStack(err) } else { op.Refresh(context.Background(), hc.makeOperationQueryRequest()) } case shared.Stop: var sr shared.StopProcessRequest unmarshal(&sr, resp.Args) logger.Println("args :", sr) if err := hc.stopChildProcess(&sr, op); err != nil { logger.Println(err) } case shared.Restart: var rr shared.RestartProcessRequest unmarshal(&rr, resp.Args) logger.Println("args :", rr) if err := hc.restartChildProcess(&rr, op); err != nil { logger.Println(err) } case shared.Upload: var ur shared.UploadRequest unmarshal(&ur, resp.Args) logger.Println("args :", ur) if err := hc.uploadFiles(&ur); err != nil { logger.Println(err) } case shared.Exception: idstr := resp.Args["id"] id64, _ := strconv.ParseInt(idstr, 10, 0) id := int32(id64) var found *procmeta hc.childProcs = gocommon.ShrinkSlice(hc.childProcs, func(e *procmeta) bool { if e.id == id { found = e return true } return false }) if found != nil { found.cmd.Wait() found.cmd.Process.Release() if found.recover { time.Sleep(time.Second) sr := shared.StartProcessRequest{ Name: found.name, Version: found.version, Args: found.args, AutoRestart: found.recover, OutputLogFile: found.logfile, } if err := hc.startChildProcess(&sr); err != nil { logger.Println("startChildProcess failed by autorun :", err) logger.ErrorWithCallStack(err) } else { logger.Println("recover success :", sr) } } } if op != nil { op.Refresh(context.Background(), hc.makeOperationQueryRequest()) } } } } }() hc.shutdownFunc = func() { // child process 강제 종료 for _, procmeta := range hc.childProcs { if procmeta.cmd != nil && procmeta.cmd.Process != nil { procmeta.cmd.Process.Signal(os.Kill) } } time.Sleep(time.Second) cancel() } hc.exitChan = exitChan hc.ctx = ctx hc.operationChan = operationChan return hc, nil } func (hc *houstonClient) Start() { // receive from stream defer func() { hc.wg.Wait() for _, proc := range hc.childProcs { if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) proc.setState(protos.ProcessState_Stopping) } } for _, proc := range hc.childProcs { proc.cmd.Wait() proc.cmd.Process.Release() } }() interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { c := <-interrupt logger.Println("interrupt!!!!!!!! :", c.String()) hc.shutdownFunc() }() var client *grpc.ClientConn reconnCount := 0 time.Sleep(time.Second) if autorun != nil && len(*autorun) > 0 { hascount := strings.Split(*autorun, "/") var service string count := 1 if len(hascount) > 1 { service = hascount[0] if len(hascount[1]) > 0 { count, _ = strconv.Atoi(hascount[1]) } } else { service = *autorun } if cmd, ok := hc.config.Autorun[service]; ok { // service 서비스 for i := 0; i < count; i++ { sr := shared.StartProcessRequest{ Name: service, Version: cmd.Version, Args: append([]string{cmd.Exec}, cmd.Args...), AutoRestart: cmd.AutoRestart, OutputLogFile: cmd.OutputLogFile, } if err := hc.startChildProcess(&sr); err != nil { logger.Println("startChildProcess failed by autorun :", err) logger.ErrorWithCallStack(err) } else { logger.Println("autorun success :", sr) } } } } for { select { case <-hc.ctx.Done(): return default: if client == nil { if reconnCount == 0 { logger.Println("grpc.DialContext :", hc.config.GrpcAddress) } reconnCount++ var err error dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second) client, err = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) cancelDial() if err != nil { logger.Println("grpc.DialContext returns err :", err) } else if client != nil { reconnCount = 0 logger.Println("grpc.DialContext succeeded") hc.clientChan <- client } } if client != nil { err := hc.checkOperation(client) if err != nil { logger.Println("grpc.DialContext hc.checkOperation failed :", err) client = nil } } } } } func (hc *houstonClient) Shutdown() { hc.shutdownFunc() } func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error { defer func() { r := recover() if r != nil { logger.Println(r) } }() op := protos.NewOperationClient(client) cl, err := op.Query(hc.ctx, grpc.WaitForReady(true)) if err != nil { return err } err = cl.Send(hc.makeOperationQueryRequest()) if err != nil { cl.CloseSend() return err } for { update, err := cl.Recv() if err != nil { cl.CloseSend() return err } hc.operationChan <- update } }