package client import ( "context" "encoding/json" "errors" "fmt" "io" "io/fs" "os" "os/exec" "os/signal" "path" "path/filepath" "reflect" "sort" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "unsafe" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type clientConfig struct { GrpcAddress string `json:"grpc_server_address"` HttpAddress string `json:"http_server_address"` StorageRoot string `json:"storage_path"` MetricNamespace string `json:"metric_namespace"` } func loadClientConfig() (clientConfig, error) { configFile, err := os.Open("config.json") if err != nil { return clientConfig{}, err } defer configFile.Close() var config struct { Houston *struct { Client clientConfig `json:"client"` } `json:"houston"` } dec := json.NewDecoder(configFile) err = dec.Decode(&config) if err != nil { return clientConfig{}, err } if config.Houston == nil { return clientConfig{}, errors.New(`"houston" object is missing in config.json`) } return config.Houston.Client, nil } type HoustonClient interface { Shutdown() Start() } var seq = int32(1) type procmeta struct { id int32 cmd *exec.Cmd name string args []string version string verpath string state int32 stdin io.WriteCloser } func (pm *procmeta) isState(s protos.ProcessState) bool { return atomic.LoadInt32(&pm.state) == int32(s) } func (pm *procmeta) getState() protos.ProcessState { return protos.ProcessState(atomic.LoadInt32(&pm.state)) } func (pm *procmeta) setState(s protos.ProcessState) { atomic.StoreInt32(&pm.state, int32(s)) } type houstonClient struct { childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 deploys map[string][]*protos.VersionAndArgs shutdownFunc context.CancelFunc ctx context.Context operationChan chan *protos.OperationQueryResponse exitChan chan *exec.Cmd clientChan chan *grpc.ClientConn timestamp string wg sync.WaitGroup config clientConfig version string standalone bool siblingProcIndex map[string]uint64 } func unmarshal[T any](val *T, src map[string]string) { argval := reflect.ValueOf(val) for i := 0; i < argval.Elem().Type().NumField(); i++ { if !argval.Elem().Type().Field(i).IsExported() { continue } arg := src[argval.Elem().Type().Field(i).Name] if argval.Elem().Field(i).CanInt() { num, _ := strconv.ParseInt(arg, 10, 0) argval.Elem().Field(i).SetInt(num) } else if argval.Elem().Field(i).Kind() == reflect.Array || argval.Elem().Field(i).Kind() == reflect.Slice { conv := strings.Split(arg, "\n") argval.Elem().Field(i).Set(reflect.ValueOf(conv)) } else { argval.Elem().Field(i).SetString(arg) } } logger.Println("operation receive :", argval.Elem().Type().Name(), *val) } func gatherDeployedPrograms(storageRoot, name string) []*protos.VersionAndArgs { var rawvers []*protos.VersionAndArgs targetPath := path.Join(storageRoot, name) if vers, err := os.ReadDir(targetPath); err == nil { for _, ver := range vers { if ver.IsDir() { args := lastExecutionArgs(path.Join(targetPath, ver.Name())) rawvers = append(rawvers, &protos.VersionAndArgs{ Version: ver.Name(), Args: args, }) } } } sort.Slice(rawvers, func(i, j int) bool { leftParsed := shared.ParseVersionString(rawvers[i].Version) rightParsed := shared.ParseVersionString(rawvers[j].Version) return shared.CompareVersionString(leftParsed, rightParsed) < 0 }) return rawvers } func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest { var procs []*protos.ProcessDescription var deploys []*protos.DeployedVersions var selfname string var selfargs []string if hc.standalone { selfname = path.Base(filepath.ToSlash(os.Args[0])) selfargs = os.Args[1:] } else { selfname = "houston" selfargs = []string{} } if len(path.Ext(selfname)) > 0 { selfname = selfname[:len(selfname)-len(path.Ext(selfname))] } procs = append(procs, &protos.ProcessDescription{ Name: selfname, Args: selfargs, Version: hc.version, State: protos.ProcessState_Running, Pid: int32(os.Getpid()), }) deploys = append(deploys, &protos.DeployedVersions{ Name: selfname, Versions: []*protos.VersionAndArgs{ {Version: hc.version, Args: selfargs}, }, }) for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ Name: child.name, Args: child.cmd.Args, Version: child.version, State: child.getState(), Pid: int32(child.cmd.Process.Pid), }) } for name, prog := range hc.deploys { deploys = append(deploys, &protos.DeployedVersions{ Name: name, Versions: prog, }) } hn, _ := os.Hostname() return &protos.OperationQueryRequest{ Hostname: hn, Procs: procs, Deploys: deploys, } } func NewClient(standalone bool) (HoustonClient, error) { clientConfig, err := loadClientConfig() if err != nil { return nil, err } if len(clientConfig.GrpcAddress) == 0 { return nil, errors.New("client.grpc_server_address is missing") } if len(clientConfig.HttpAddress) == 0 { return nil, errors.New("client.http_server_address is missing") } exefile, err := os.Executable() if err != nil { return nil, err } exefi, err := os.Stat(exefile) if err != nil { return nil, err } sp, err := os.Stat(clientConfig.StorageRoot) if err != nil { if errors.Is(err, fs.ErrNotExist) { err = os.MkdirAll(clientConfig.StorageRoot, 0775) } } else if !sp.IsDir() { err = errors.New(clientConfig.StorageRoot + " is not directory") } if err != nil { return nil, err } deploys := make(map[string][]*protos.VersionAndArgs) if dirs, err := os.ReadDir(clientConfig.StorageRoot); err == nil { for _, dir := range dirs { if dir.IsDir() { flagf := path.Join(clientConfig.StorageRoot, dir.Name(), "@houston") if _, err := os.Stat(flagf); !os.IsNotExist(err) { deploys[dir.Name()] = gatherDeployedPrograms(clientConfig.StorageRoot, dir.Name()) } } } } ver, _ := os.ReadFile("@version") if len(ver) == 0 { ver = []byte("0.0.0") } hc := &houstonClient{ config: clientConfig, clientChan: make(chan *grpc.ClientConn), extraMetrics: unsafe.Pointer(&map[string]float32{}), deploys: deploys, timestamp: exefi.ModTime().String(), version: string(ver), standalone: standalone, siblingProcIndex: make(map[string]uint64), } ctx, cancel := context.WithCancel(context.Background()) exitChan := make(chan *exec.Cmd, 10) operationChan := make(chan *protos.OperationQueryResponse, 10) hc.wg.Add(1) go func() { defer hc.wg.Done() // 메인 operator var op protos.OperationClient myname, _ := os.Executable() myname = path.Base(filepath.ToSlash(myname)) if len(path.Ext(myname)) > 0 { myname = myname[:len(myname)-len(path.Ext(myname))] } if myname == "__debug_bin" { myname = "houston" } for { select { case <-ctx.Done(): return case newClient := <-hc.clientChan: op = protos.NewOperationClient(newClient) case exited := <-exitChan: var newprocs []*procmeta for _, proc := range hc.childProcs { if proc.cmd == exited { if proc.isState(protos.ProcessState_Running) || proc.isState(protos.ProcessState_Restart) { go func(proc *procmeta) { if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) } proc.cmd.Wait() proc.cmd.Process.Release() if proc.isState(protos.ProcessState_Restart) { hc.startChildProcess(&shared.StartProcessRequest{ Version: proc.version, Name: proc.name, Args: proc.args, }, op) } }(proc) } } else { newprocs = append(newprocs, proc) } } hc.childProcs = newprocs op.Refresh(ctx, hc.makeOperationQueryRequest()) case resp := <-operationChan: switch shared.Operation(resp.Operation) { case shared.Deploy: var dr shared.DeployRequest unmarshal(&dr, resp.Args) if dr.Name == myname { if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil { args := []string{ fmt.Sprintf("%d", os.Getpid()), srcdir, filepath.ToSlash(os.Args[0]), } args = append(args, os.Args[1:]...) cmd := exec.Command(replacer, args...) if err := cmd.Start(); err != nil { logger.Println(err) } else { hc.shutdownFunc() } } else { logger.Println(err) } } else { hn, _ := os.Hostname() if err := hc.deploy(&dr, func(dp *protos.DeployingProgress) { dp.Hostname = hn dp.Name = dr.Name dp.Version = dr.Version op.ReportDeployingProgress(ctx, dp) }); err == nil { prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name) hc.deploys[dr.Name] = prog op.Refresh(ctx, hc.makeOperationQueryRequest()) op.ReportDeployingProgress(ctx, &protos.DeployingProgress{ Hostname: hn, Name: dr.Name, Version: dr.Version, State: "success", Progress: 0, Total: 0, }) } else { logger.Println(err) op.ReportDeployingProgress(ctx, &protos.DeployingProgress{ Hostname: hn, Name: dr.Name, Version: dr.Version, State: "fail:" + err.Error(), Progress: 0, Total: 0, }) } } case shared.Withdraw: var wr shared.WithdrawRequest unmarshal(&wr, resp.Args) err := hc.withdraw(&wr) if err == nil { prog := gatherDeployedPrograms(hc.config.StorageRoot, wr.Name) if len(prog) == 0 { delete(hc.deploys, wr.Name) } else { hc.deploys[wr.Name] = prog } op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } case shared.Start: var sr shared.StartProcessRequest unmarshal(&sr, resp.Args) if err := hc.startChildProcess(&sr, op); err != nil { logger.Println(err) } case shared.Stop: var sr shared.StopProcessRequest unmarshal(&sr, resp.Args) if err := hc.stopChildProcess(&sr, op); err != nil { logger.Println(err) } case shared.Restart: var rr shared.RestartProcessRequest unmarshal(&rr, resp.Args) if err := hc.restartChildProcess(&rr, op); err != nil { logger.Println(err) } case shared.Upload: var ur shared.UploadRequest unmarshal(&ur, resp.Args) if err := hc.uploadFiles(&ur); err != nil { logger.Println(err) } case shared.Exception: idstr := resp.Args["id"] id64, _ := strconv.ParseInt(idstr, 10, 0) id := int32(id64) hc.childProcs = gocommon.ShrinkSlice(hc.childProcs, func(e *procmeta) bool { if e.id == id { e.cmd.Wait() e.cmd.Process.Release() return true } return false }) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) } } } }() hc.shutdownFunc = func() { // child process 강제 종료 for _, procmeta := range hc.childProcs { if procmeta.cmd != nil && procmeta.cmd.Process != nil { procmeta.cmd.Process.Signal(os.Kill) } } time.Sleep(time.Second) cancel() } hc.exitChan = exitChan hc.ctx = ctx hc.operationChan = operationChan return hc, nil } func (hc *houstonClient) Start() { // receive from stream defer func() { hc.wg.Wait() for _, proc := range hc.childProcs { if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) proc.setState(protos.ProcessState_Stopping) } } for _, proc := range hc.childProcs { proc.cmd.Wait() proc.cmd.Process.Release() } }() interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { c := <-interrupt logger.Println("interrupt!!!!!!!! :", c.String()) hc.shutdownFunc() }() var client *grpc.ClientConn reconnCount := 0 time.Sleep(time.Second) for { select { case <-hc.ctx.Done(): return default: if client == nil { if reconnCount == 0 { logger.Println("grpc.DialContext :", hc.config.GrpcAddress) } reconnCount++ var err error dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second) client, err = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) cancelDial() if err != nil { logger.Println("grpc.DialContext returns err :", err) } else if client != nil { reconnCount = 0 logger.Println("grpc.DialContext succeeded") hc.clientChan <- client } } if client != nil { err := hc.checkOperation(client) if err != nil { logger.Println("grpc.DialContext hc.checkOperation failed :", err) client = nil } } } } } func (hc *houstonClient) Shutdown() { hc.shutdownFunc() } func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error { defer func() { r := recover() if r != nil { logger.Println(r) } }() op := protos.NewOperationClient(client) cl, err := op.Query(hc.ctx, grpc.WaitForReady(true)) if err != nil { return err } err = cl.Send(hc.makeOperationQueryRequest()) if err != nil { cl.CloseSend() return err } for { update, err := cl.Recv() if err != nil { cl.CloseSend() return err } hc.operationChan <- update } }