package client import ( "context" "go-ayo/common/logger" "houston/shared" "houston/shared/protos" "io" "os" "os/exec" "path" "reflect" "sort" "strconv" "sync/atomic" "unsafe" "time" "github.com/shirou/gopsutil/v3/cpu" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" sigar "github.com/cloudfoundry/gosigar" ) type HoustonClient interface { SetReportMetrics(map[string]float32) Shutdown() } type procmeta struct { cmd *exec.Cmd name string version string state protos.ProcessState stdin io.WriteCloser stdPrefix string stdoutSize int32 stderrSize int32 } type houstonClient struct { client *grpc.ClientConn childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 deploys map[string][]*protos.VersionAndArgs shutdownFunc context.CancelFunc exitChan chan *exec.Cmd httpAddr string timestamp string } func bToMb(b uint64) uint32 { return uint32(b / 1024 / 1024) } func unmarshal[T any](val *T, src map[string]string) { argval := reflect.ValueOf(val) for i := 0; i < argval.Elem().Type().NumField(); i++ { if !argval.Elem().Type().Field(i).IsExported() { continue } arg := src[argval.Elem().Type().Field(i).Name] if argval.Elem().Field(i).CanInt() { num, _ := strconv.ParseInt(arg, 10, 0) argval.Elem().Field(i).SetInt(num) } else { argval.Elem().Field(i).SetString(arg) } } } func gatherDeployedPrograms(name string) []*protos.VersionAndArgs { var rawvers []*protos.VersionAndArgs if vers, err := os.ReadDir(path.Join("./", name)); err == nil { for _, ver := range vers { if ver.IsDir() { args := lastExecutionArgs(path.Join(name, ver.Name())) rawvers = append(rawvers, &protos.VersionAndArgs{ Version: ver.Name(), Args: args, }) } } } sort.Slice(rawvers, func(i, j int) bool { leftParsed := parseVersionString(rawvers[i].Version) rightParsed := parseVersionString(rawvers[j].Version) return compareVersionString(leftParsed, rightParsed) < 0 }) return rawvers } func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest { hn, _ := os.Hostname() procs := make([]*protos.ProcessDescription, 0, len(hc.childProcs)) for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ Name: child.name, Args: child.cmd.Args, Version: child.version, State: child.state, Pid: int32(child.cmd.Process.Pid), StdoutSize: atomic.LoadInt32(&child.stdoutSize), StderrSize: atomic.LoadInt32(&child.stderrSize), }) } var deploys []*protos.DeployedVersions for name, prog := range hc.deploys { deploys = append(deploys, &protos.DeployedVersions{ Name: name, Versions: prog, }) } return &protos.OperationQueryRequest{ Hostname: hn, Procs: procs, Deploys: deploys, } } func NewClient(grpcAddr string, httpAddr string) (HoustonClient, error) { client, err := grpc.Dial(grpcAddr, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } exefile, err := os.Executable() if err != nil { return nil, err } exefi, err := os.Stat(exefile) if err != nil { return nil, err } deploys := make(map[string][]*protos.VersionAndArgs) if dirs, err := os.ReadDir("./"); err == nil { for _, dir := range dirs { if dir.IsDir() { flagf := path.Join(dir.Name(), "@houston") if _, err := os.Stat(flagf); !os.IsNotExist(err) { deploys[dir.Name()] = gatherDeployedPrograms(dir.Name()) } } } } hc := &houstonClient{ client: client, extraMetrics: unsafe.Pointer(&map[string]float32{}), deploys: deploys, httpAddr: httpAddr, timestamp: exefi.ModTime().String(), } ctx, cancel := context.WithCancel(context.Background()) go func() { // regularly send status sc := protos.NewMonitorClient(client) hn, _ := os.Hostname() mem := sigar.Mem{} mem.Get() metrics := &protos.Metrics{ Hostname: hn, Total: bToMb(mem.Total), } for { select { case <-ctx.Done(): return case <-time.After(5 * time.Second): percent, _ := cpu.Percent(0, false) metrics.Cpu = float32(percent[0]) metrics.Free = bToMb(mem.ActualFree) metrics.Metrics = *(*map[string]float32)(atomic.LoadPointer(&hc.extraMetrics)) sc.Report(context.Background(), metrics, grpc.WaitForReady(true)) mem.Get() } } }() exitChan := make(chan *exec.Cmd, 10) operationChan := make(chan *protos.OperationQueryResponse, 10) go func() { // 메인 operator op := protos.NewOperationClient(hc.client) for { select { case <-ctx.Done(): return case exited := <-exitChan: for _, proc := range hc.childProcs { if proc.cmd == exited && proc.state != protos.ProcessState_Stopped { proc.state = protos.ProcessState_Stopped op.Refresh(ctx, hc.makeOperationQueryRequest()) break } } case resp := <-operationChan: switch shared.Operation(resp.Operation) { case shared.Deploy: var dr shared.DeployRequest unmarshal(&dr, resp.Args) err := hc.deploy(&dr) if err == nil { prog := gatherDeployedPrograms(dr.Name) hc.deploys[dr.Name] = prog op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } case shared.Withdraw: var wr shared.WithdrawRequest unmarshal(&wr, resp.Args) err := hc.withdraw(&wr) if err == nil { prog := gatherDeployedPrograms(wr.Name) hc.deploys[wr.Name] = prog op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } case shared.Start: var sr shared.StartProcessRequest unmarshal(&sr, resp.Args) if err := hc.startChildProcess(&sr); err != nil { logger.Println(err) } case shared.Stop: var sr shared.StopProcessRequest unmarshal(&sr, resp.Args) if err := hc.stopChildProcess(&sr); err != nil { logger.Println(err) } case shared.Restart: var rr shared.RestartProcessRequest unmarshal(&rr, resp.Args) if err := hc.restartChildProcess(&rr); err != nil { logger.Println(err) } case shared.Upload: var ur shared.UploadRequest unmarshal(&ur, resp.Args) if err := hc.uploadFiles(&ur); err != nil { logger.Println(err) } } } } }() go func() { // receive from stream for { select { case <-ctx.Done(): return default: err := hc.checkOperation(operationChan) if err != nil { logger.Println("hc.checkUpdate failed :", err) } } } }() hc.shutdownFunc = cancel hc.exitChan = exitChan return hc, nil } func (hc *houstonClient) Shutdown() { hc.shutdownFunc() } func (hc *houstonClient) checkOperation(opChan chan<- *protos.OperationQueryResponse) error { defer func() { r := recover() if r != nil { logger.Error(r) } }() op := protos.NewOperationClient(hc.client) cl, err := op.Query(context.Background(), grpc.WaitForReady(true)) if err != nil { return err } err = cl.Send(hc.makeOperationQueryRequest()) if err != nil { cl.CloseSend() return err } for { update, err := cl.Recv() if err != nil { cl.CloseSend() return err } opChan <- update } } func (hc *houstonClient) SetReportMetrics(extra map[string]float32) { atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra)) }