532 lines
12 KiB
Go
532 lines
12 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path"
|
|
"path/filepath"
|
|
"reflect"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
|
"repositories.action2quare.com/ayo/houston/shared"
|
|
"repositories.action2quare.com/ayo/houston/shared/protos"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
type clientConfig struct {
|
|
GrpcAddress string `json:"grpc_server_address"`
|
|
HttpAddress string `json:"http_server_address"`
|
|
StorageRoot string `json:"storage_path"`
|
|
RunNodeExporter bool `json:"run_node_exporter"`
|
|
}
|
|
|
|
func loadClientConfig() (clientConfig, error) {
|
|
configFile, err := os.Open("config.json")
|
|
if err != nil {
|
|
return clientConfig{}, err
|
|
}
|
|
defer configFile.Close()
|
|
|
|
var config struct {
|
|
Houston *struct {
|
|
Client clientConfig `json:"client"`
|
|
} `json:"houston"`
|
|
}
|
|
|
|
dec := json.NewDecoder(configFile)
|
|
err = dec.Decode(&config)
|
|
if err != nil {
|
|
return clientConfig{}, err
|
|
}
|
|
|
|
if config.Houston == nil {
|
|
return clientConfig{}, errors.New(`"houston" object is missing in config.json`)
|
|
}
|
|
|
|
return config.Houston.Client, nil
|
|
}
|
|
|
|
type HoustonClient interface {
|
|
SetReportMetrics(map[string]float32)
|
|
Shutdown()
|
|
Start()
|
|
}
|
|
|
|
type procmeta struct {
|
|
cmd *exec.Cmd
|
|
name string
|
|
args []string
|
|
version string
|
|
state protos.ProcessState
|
|
stdin io.WriteCloser
|
|
}
|
|
|
|
type houstonClient struct {
|
|
childProcs []*procmeta
|
|
extraMetrics unsafe.Pointer // map[string]float32
|
|
deploys map[string][]*protos.VersionAndArgs
|
|
shutdownFunc context.CancelFunc
|
|
ctx context.Context
|
|
operationChan chan *protos.OperationQueryResponse
|
|
exitChan chan *exec.Cmd
|
|
clientChan chan *grpc.ClientConn
|
|
timestamp string
|
|
wg sync.WaitGroup
|
|
config clientConfig
|
|
version string
|
|
standalone bool
|
|
}
|
|
|
|
func unmarshal[T any](val *T, src map[string]string) {
|
|
argval := reflect.ValueOf(val)
|
|
logger.Println("operation receive :", argval.Type().Name(), src)
|
|
for i := 0; i < argval.Elem().Type().NumField(); i++ {
|
|
if !argval.Elem().Type().Field(i).IsExported() {
|
|
continue
|
|
}
|
|
arg := src[argval.Elem().Type().Field(i).Name]
|
|
if argval.Elem().Field(i).CanInt() {
|
|
num, _ := strconv.ParseInt(arg, 10, 0)
|
|
argval.Elem().Field(i).SetInt(num)
|
|
} else if argval.Elem().Field(i).Kind() == reflect.Array || argval.Elem().Field(i).Kind() == reflect.Slice {
|
|
conv := strings.Split(arg, "\n")
|
|
argval.Elem().Field(i).Set(reflect.ValueOf(conv))
|
|
} else {
|
|
argval.Elem().Field(i).SetString(arg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func gatherDeployedPrograms(storageRoot, name string) []*protos.VersionAndArgs {
|
|
var rawvers []*protos.VersionAndArgs
|
|
targetPath := path.Join(storageRoot, name)
|
|
if vers, err := os.ReadDir(targetPath); err == nil {
|
|
for _, ver := range vers {
|
|
if ver.IsDir() {
|
|
args := lastExecutionArgs(path.Join(targetPath, ver.Name()))
|
|
rawvers = append(rawvers, &protos.VersionAndArgs{
|
|
Version: ver.Name(),
|
|
Args: args,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
sort.Slice(rawvers, func(i, j int) bool {
|
|
leftParsed := shared.ParseVersionString(rawvers[i].Version)
|
|
rightParsed := shared.ParseVersionString(rawvers[j].Version)
|
|
return shared.CompareVersionString(leftParsed, rightParsed) < 0
|
|
})
|
|
return rawvers
|
|
}
|
|
|
|
func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest {
|
|
var procs []*protos.ProcessDescription
|
|
var deploys []*protos.DeployedVersions
|
|
|
|
var selfname string
|
|
var selfargs []string
|
|
if hc.standalone {
|
|
selfname = path.Base(os.Args[0])
|
|
selfargs = os.Args[1:]
|
|
} else {
|
|
selfname = "houston"
|
|
selfargs = []string{}
|
|
}
|
|
|
|
procs = append(procs, &protos.ProcessDescription{
|
|
Name: selfname,
|
|
Args: selfargs,
|
|
Version: hc.version,
|
|
State: protos.ProcessState_Running,
|
|
Pid: int32(os.Getpid()),
|
|
})
|
|
deploys = append(deploys, &protos.DeployedVersions{
|
|
Name: selfname,
|
|
Versions: []*protos.VersionAndArgs{
|
|
{Version: hc.version, Args: selfargs},
|
|
},
|
|
})
|
|
|
|
for _, child := range hc.childProcs {
|
|
procs = append(procs, &protos.ProcessDescription{
|
|
Name: child.name,
|
|
Args: child.args,
|
|
Version: child.version,
|
|
State: child.state,
|
|
Pid: int32(child.cmd.Process.Pid),
|
|
})
|
|
}
|
|
|
|
for name, prog := range hc.deploys {
|
|
deploys = append(deploys, &protos.DeployedVersions{
|
|
Name: name,
|
|
Versions: prog,
|
|
})
|
|
}
|
|
|
|
hn, _ := os.Hostname()
|
|
return &protos.OperationQueryRequest{
|
|
Hostname: hn,
|
|
Procs: procs,
|
|
Deploys: deploys,
|
|
}
|
|
}
|
|
|
|
func NewClient(standalone bool) (HoustonClient, error) {
|
|
clientConfig, err := loadClientConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(clientConfig.GrpcAddress) == 0 {
|
|
return nil, errors.New("client.grpc_server_address is missing")
|
|
}
|
|
|
|
if len(clientConfig.HttpAddress) == 0 {
|
|
return nil, errors.New("client.http_server_address is missing")
|
|
}
|
|
|
|
exefile, err := os.Executable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
exefi, err := os.Stat(exefile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sp, err := os.Stat(clientConfig.StorageRoot)
|
|
if err != nil {
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
err = os.MkdirAll(clientConfig.StorageRoot, 0775)
|
|
}
|
|
} else if !sp.IsDir() {
|
|
err = errors.New(clientConfig.StorageRoot + " is not directory")
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
deploys := make(map[string][]*protos.VersionAndArgs)
|
|
if dirs, err := os.ReadDir(clientConfig.StorageRoot); err == nil {
|
|
for _, dir := range dirs {
|
|
if dir.IsDir() {
|
|
flagf := path.Join(clientConfig.StorageRoot, dir.Name(), "@houston")
|
|
if _, err := os.Stat(flagf); !os.IsNotExist(err) {
|
|
deploys[dir.Name()] = gatherDeployedPrograms(clientConfig.StorageRoot, dir.Name())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ver, _ := os.ReadFile("@version")
|
|
if len(ver) == 0 {
|
|
ver = []byte("0.0.0")
|
|
}
|
|
|
|
hc := &houstonClient{
|
|
config: clientConfig,
|
|
clientChan: make(chan *grpc.ClientConn),
|
|
extraMetrics: unsafe.Pointer(&map[string]float32{}),
|
|
deploys: deploys,
|
|
timestamp: exefi.ModTime().String(),
|
|
version: string(ver),
|
|
standalone: standalone,
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
exitChan := make(chan *exec.Cmd, 10)
|
|
operationChan := make(chan *protos.OperationQueryResponse, 10)
|
|
hc.wg.Add(1)
|
|
|
|
go func() {
|
|
defer hc.wg.Done()
|
|
|
|
// 메인 operator
|
|
var op protos.OperationClient
|
|
myname, _ := os.Executable()
|
|
myname = path.Base(filepath.ToSlash(myname))
|
|
if len(path.Ext(myname)) > 0 {
|
|
myname = myname[:len(myname)-len(path.Ext(myname))]
|
|
}
|
|
if myname == "__debug_bin" {
|
|
myname = "houston"
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case newClient := <-hc.clientChan:
|
|
op = protos.NewOperationClient(newClient)
|
|
|
|
case exited := <-exitChan:
|
|
var newprocs []*procmeta
|
|
for _, proc := range hc.childProcs {
|
|
if proc.cmd == exited {
|
|
if proc.state == protos.ProcessState_Running || proc.state == protos.ProcessState_Restart {
|
|
go func(proc *procmeta) {
|
|
if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
|
proc.cmd.Process.Signal(os.Kill)
|
|
}
|
|
proc.cmd.Wait()
|
|
proc.cmd.Process.Release()
|
|
|
|
if proc.state == protos.ProcessState_Restart {
|
|
hc.startChildProcess(&shared.StartProcessRequest{
|
|
Version: proc.version,
|
|
Name: proc.name,
|
|
Args: proc.args,
|
|
}, op)
|
|
}
|
|
}(proc)
|
|
}
|
|
} else {
|
|
newprocs = append(newprocs, proc)
|
|
}
|
|
}
|
|
hc.childProcs = newprocs
|
|
op.Refresh(ctx, hc.makeOperationQueryRequest())
|
|
|
|
case resp := <-operationChan:
|
|
switch shared.Operation(resp.Operation) {
|
|
case shared.Deploy:
|
|
var dr shared.DeployRequest
|
|
unmarshal(&dr, resp.Args)
|
|
if dr.Name == myname {
|
|
if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil {
|
|
args := []string{
|
|
fmt.Sprintf("%d", os.Getpid()),
|
|
srcdir,
|
|
filepath.ToSlash(os.Args[0]),
|
|
}
|
|
args = append(args, os.Args[1:]...)
|
|
cmd := exec.Command(replacer, args...)
|
|
if err := cmd.Start(); err != nil {
|
|
logger.Println(err)
|
|
} else {
|
|
hc.shutdownFunc()
|
|
}
|
|
} else {
|
|
logger.Println(err)
|
|
}
|
|
} else {
|
|
hn, _ := os.Hostname()
|
|
|
|
if err := hc.deploy(&dr, func(dp *protos.DeployingProgress) {
|
|
dp.Hostname = hn
|
|
dp.Name = dr.Name
|
|
dp.Version = dr.Version
|
|
op.ReportDeployingProgress(ctx, dp)
|
|
}); err == nil {
|
|
prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name)
|
|
hc.deploys[dr.Name] = prog
|
|
op.Refresh(ctx, hc.makeOperationQueryRequest())
|
|
|
|
op.ReportDeployingProgress(ctx, &protos.DeployingProgress{
|
|
Hostname: hn,
|
|
Name: dr.Name,
|
|
Version: dr.Version,
|
|
State: "success",
|
|
Progress: 0,
|
|
Total: 0,
|
|
})
|
|
} else {
|
|
logger.Println(err)
|
|
|
|
op.ReportDeployingProgress(ctx, &protos.DeployingProgress{
|
|
Hostname: hn,
|
|
Name: dr.Name,
|
|
Version: dr.Version,
|
|
State: "fail:" + err.Error(),
|
|
Progress: 0,
|
|
Total: 0,
|
|
})
|
|
}
|
|
}
|
|
|
|
case shared.Withdraw:
|
|
var wr shared.WithdrawRequest
|
|
unmarshal(&wr, resp.Args)
|
|
err := hc.withdraw(&wr)
|
|
if err == nil {
|
|
prog := gatherDeployedPrograms(hc.config.StorageRoot, wr.Name)
|
|
if len(prog) == 0 {
|
|
delete(hc.deploys, wr.Name)
|
|
} else {
|
|
hc.deploys[wr.Name] = prog
|
|
}
|
|
op.Refresh(ctx, hc.makeOperationQueryRequest())
|
|
} else {
|
|
logger.Println(err)
|
|
}
|
|
|
|
case shared.Start:
|
|
var sr shared.StartProcessRequest
|
|
unmarshal(&sr, resp.Args)
|
|
if err := hc.startChildProcess(&sr, op); err != nil {
|
|
logger.Println(err)
|
|
}
|
|
|
|
case shared.Stop:
|
|
var sr shared.StopProcessRequest
|
|
unmarshal(&sr, resp.Args)
|
|
if err := hc.stopChildProcess(&sr, op); err != nil {
|
|
logger.Println(err)
|
|
}
|
|
|
|
case shared.Restart:
|
|
var rr shared.RestartProcessRequest
|
|
unmarshal(&rr, resp.Args)
|
|
if err := hc.restartChildProcess(&rr, op); err != nil {
|
|
logger.Println(err)
|
|
}
|
|
|
|
case shared.Upload:
|
|
var ur shared.UploadRequest
|
|
unmarshal(&ur, resp.Args)
|
|
if err := hc.uploadFiles(&ur); err != nil {
|
|
logger.Println(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
hc.shutdownFunc = cancel
|
|
hc.exitChan = exitChan
|
|
hc.ctx = ctx
|
|
hc.operationChan = operationChan
|
|
|
|
return hc, nil
|
|
}
|
|
|
|
func (hc *houstonClient) Start() {
|
|
// receive from stream
|
|
defer func() {
|
|
hc.wg.Wait()
|
|
|
|
for _, proc := range hc.childProcs {
|
|
if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
|
proc.cmd.Process.Signal(os.Kill)
|
|
proc.state = protos.ProcessState_Stopping
|
|
}
|
|
}
|
|
|
|
for _, proc := range hc.childProcs {
|
|
proc.cmd.Wait()
|
|
proc.cmd.Process.Release()
|
|
}
|
|
}()
|
|
|
|
interrupt := make(chan os.Signal, 1)
|
|
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
|
|
|
go func() {
|
|
c := <-interrupt
|
|
logger.Println("interrupt!!!!!!!! :", c.String())
|
|
hc.shutdownFunc()
|
|
}()
|
|
|
|
var client *grpc.ClientConn
|
|
reconnCount := 0
|
|
time.Sleep(time.Second)
|
|
|
|
for {
|
|
select {
|
|
case <-hc.ctx.Done():
|
|
return
|
|
|
|
default:
|
|
if client == nil {
|
|
if reconnCount == 0 {
|
|
logger.Println("grpc.DialContext :", hc.config.GrpcAddress)
|
|
}
|
|
|
|
reconnCount++
|
|
|
|
var err error
|
|
dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second)
|
|
client, err = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
cancelDial()
|
|
|
|
if err != nil {
|
|
logger.Println("grpc.DialContext returns err :", err)
|
|
} else if client != nil {
|
|
reconnCount = 0
|
|
logger.Println("grpc.DialContext succeeded")
|
|
hc.clientChan <- client
|
|
}
|
|
}
|
|
|
|
if client != nil {
|
|
err := hc.checkOperation(client)
|
|
if err != nil {
|
|
logger.Println("hc.checkUpdate failed :", err)
|
|
|
|
client = nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (hc *houstonClient) Shutdown() {
|
|
hc.shutdownFunc()
|
|
}
|
|
|
|
func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logger.Println(r)
|
|
}
|
|
}()
|
|
|
|
op := protos.NewOperationClient(client)
|
|
cl, err := op.Query(hc.ctx, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = cl.Send(hc.makeOperationQueryRequest())
|
|
if err != nil {
|
|
cl.CloseSend()
|
|
return err
|
|
}
|
|
|
|
for {
|
|
update, err := cl.Recv()
|
|
if err != nil {
|
|
cl.CloseSend()
|
|
return err
|
|
}
|
|
hc.operationChan <- update
|
|
}
|
|
}
|
|
|
|
func (hc *houstonClient) SetReportMetrics(extra map[string]float32) {
|
|
atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra))
|
|
}
|