Files
houston/client/client.go

521 lines
12 KiB
Go

package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/houston/shared"
"repositories.action2quare.com/ayo/houston/shared/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type clientConfig struct {
GrpcAddress string `json:"grpc_server_address"`
HttpAddress string `json:"http_server_address"`
StorageRoot string `json:"storage_path"`
}
func loadClientConfig() (clientConfig, error) {
configFile, err := os.Open("config.json")
if err != nil {
return clientConfig{}, err
}
defer configFile.Close()
var config struct {
Houston *struct {
Client clientConfig `json:"client"`
} `json:"houston"`
}
dec := json.NewDecoder(configFile)
err = dec.Decode(&config)
if err != nil {
return clientConfig{}, err
}
if config.Houston == nil {
return clientConfig{}, errors.New(`"houston" object is missing in config.json`)
}
return config.Houston.Client, nil
}
type HoustonClient interface {
SetReportMetrics(map[string]float32)
Shutdown()
Start()
}
type bufferStack struct {
pool [5][]byte
cursor int32
}
func (bs *bufferStack) pop() []byte {
pos := atomic.LoadInt32(&bs.cursor)
for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) {
pos = atomic.LoadInt32(&bs.cursor)
}
defer func() {
bs.pool[pos] = nil
}()
curbuf := bs.pool[pos]
if curbuf == nil {
curbuf = make([]byte, 1024)
}
return curbuf
}
func (bs *bufferStack) push(x []byte) {
pos := atomic.AddInt32(&bs.cursor, -1)
bs.pool[pos] = x
}
type procmeta struct {
cmd *exec.Cmd
name string
version string
state protos.ProcessState
stdin io.WriteCloser
logUploadChan chan *shared.UploadRequest
buffers bufferStack
}
type houstonClient struct {
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
deploys map[string][]*protos.VersionAndArgs
shutdownFunc context.CancelFunc
ctx context.Context
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn
timestamp string
wg sync.WaitGroup
config clientConfig
version string
standalone bool
}
func unmarshal[T any](val *T, src map[string]string) {
argval := reflect.ValueOf(val)
logger.Println("operation receive :", argval.Type().Name(), src)
for i := 0; i < argval.Elem().Type().NumField(); i++ {
if !argval.Elem().Type().Field(i).IsExported() {
continue
}
arg := src[argval.Elem().Type().Field(i).Name]
if argval.Elem().Field(i).CanInt() {
num, _ := strconv.ParseInt(arg, 10, 0)
argval.Elem().Field(i).SetInt(num)
} else if argval.Elem().Field(i).Kind() == reflect.Array || argval.Elem().Field(i).Kind() == reflect.Slice {
conv := strings.Split(arg, "\n")
argval.Elem().Field(i).Set(reflect.ValueOf(conv))
} else {
argval.Elem().Field(i).SetString(arg)
}
}
}
func gatherDeployedPrograms(storageRoot, name string) []*protos.VersionAndArgs {
var rawvers []*protos.VersionAndArgs
targetPath := path.Join(storageRoot, name)
if vers, err := os.ReadDir(targetPath); err == nil {
for _, ver := range vers {
if ver.IsDir() {
args := lastExecutionArgs(path.Join(targetPath, ver.Name()))
rawvers = append(rawvers, &protos.VersionAndArgs{
Version: ver.Name(),
Args: args,
})
}
}
}
sort.Slice(rawvers, func(i, j int) bool {
leftParsed := shared.ParseVersionString(rawvers[i].Version)
rightParsed := shared.ParseVersionString(rawvers[j].Version)
return shared.CompareVersionString(leftParsed, rightParsed) < 0
})
return rawvers
}
func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest {
procs := make([]*protos.ProcessDescription, 0, len(hc.childProcs)+1)
if hc.standalone {
procs = append(procs, &protos.ProcessDescription{
Name: os.Args[0],
Args: os.Args[1:],
Version: hc.version,
State: protos.ProcessState_Running,
Pid: int32(os.Getpid()),
})
} else {
procs = append(procs, &protos.ProcessDescription{
Name: "houston",
Args: []string{},
Version: hc.version,
State: protos.ProcessState_Running,
Pid: int32(os.Getpid()),
})
}
for _, child := range hc.childProcs {
procs = append(procs, &protos.ProcessDescription{
Name: child.name,
Args: child.cmd.Args,
Version: child.version,
State: child.state,
Pid: int32(child.cmd.Process.Pid),
})
}
var deploys []*protos.DeployedVersions
for name, prog := range hc.deploys {
deploys = append(deploys, &protos.DeployedVersions{
Name: name,
Versions: prog,
})
}
hn, _ := os.Hostname()
return &protos.OperationQueryRequest{
Hostname: hn,
Procs: procs,
Deploys: deploys,
}
}
func NewClient(standalone bool) (HoustonClient, error) {
clientConfig, err := loadClientConfig()
if err != nil {
return nil, err
}
if len(clientConfig.GrpcAddress) == 0 {
return nil, errors.New("client.grpc_server_address is missing")
}
if len(clientConfig.HttpAddress) == 0 {
return nil, errors.New("client.http_server_address is missing")
}
exefile, err := os.Executable()
if err != nil {
return nil, err
}
exefi, err := os.Stat(exefile)
if err != nil {
return nil, err
}
sp, err := os.Stat(clientConfig.StorageRoot)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
err = os.MkdirAll(clientConfig.StorageRoot, 0775)
}
} else if !sp.IsDir() {
err = errors.New(clientConfig.StorageRoot + " is not directory")
}
if err != nil {
return nil, err
}
deploys := make(map[string][]*protos.VersionAndArgs)
if dirs, err := os.ReadDir(clientConfig.StorageRoot); err == nil {
for _, dir := range dirs {
if dir.IsDir() {
flagf := path.Join(clientConfig.StorageRoot, dir.Name(), "@houston")
if _, err := os.Stat(flagf); !os.IsNotExist(err) {
deploys[dir.Name()] = gatherDeployedPrograms(clientConfig.StorageRoot, dir.Name())
}
}
}
}
ver, _ := os.ReadFile("@version")
if len(ver) == 0 {
ver = []byte("0.0.0")
}
hc := &houstonClient{
config: clientConfig,
clientChan: make(chan *grpc.ClientConn),
extraMetrics: unsafe.Pointer(&map[string]float32{}),
deploys: deploys,
timestamp: exefi.ModTime().String(),
version: string(ver),
standalone: standalone,
}
ctx, cancel := context.WithCancel(context.Background())
exitChan := make(chan *exec.Cmd, 10)
operationChan := make(chan *protos.OperationQueryResponse, 10)
hc.wg.Add(1)
go func() {
defer hc.wg.Done()
// 메인 operator
var op protos.OperationClient
myname, _ := os.Executable()
myname = path.Base(filepath.ToSlash(myname))
if len(path.Ext(myname)) > 0 {
myname = myname[:len(myname)-len(path.Ext(myname))]
}
if myname == "__debug_bin" {
myname = "houston"
}
for {
select {
case <-ctx.Done():
return
case newClient := <-hc.clientChan:
op = protos.NewOperationClient(newClient)
case exited := <-exitChan:
var newprocs []*procmeta
for _, proc := range hc.childProcs {
if proc.cmd == exited {
if proc.state == protos.ProcessState_Running || proc.state == protos.ProcessState_Restart {
go func(proc *procmeta) {
if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
proc.cmd.Process.Signal(os.Kill)
}
proc.cmd.Wait()
proc.cmd.Process.Release()
if proc.state == protos.ProcessState_Restart {
hc.startChildProcess(&shared.StartProcessRequest{
Version: proc.version,
Name: proc.name,
Args: proc.cmd.Args,
}, op)
}
}(proc)
}
} else {
newprocs = append(newprocs, proc)
}
}
hc.childProcs = newprocs
op.Refresh(ctx, hc.makeOperationQueryRequest())
case resp := <-operationChan:
switch shared.Operation(resp.Operation) {
case shared.Deploy:
var dr shared.DeployRequest
unmarshal(&dr, resp.Args)
if dr.Name == myname {
if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil {
args := []string{
fmt.Sprintf("%d", os.Getpid()),
srcdir,
filepath.ToSlash(os.Args[0]),
}
args = append(args, os.Args[1:]...)
cmd := exec.Command(replacer, args...)
if err := cmd.Start(); err != nil {
logger.Println(err)
} else {
hc.shutdownFunc()
}
} else {
logger.Println(err)
}
} else {
if err := hc.deploy(&dr); err == nil {
prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name)
hc.deploys[dr.Name] = prog
op.Refresh(ctx, hc.makeOperationQueryRequest())
} else {
logger.Println(err)
}
}
case shared.Withdraw:
var wr shared.WithdrawRequest
unmarshal(&wr, resp.Args)
err := hc.withdraw(&wr)
if err == nil {
prog := gatherDeployedPrograms(hc.config.StorageRoot, wr.Name)
if len(prog) == 0 {
delete(hc.deploys, wr.Name)
} else {
hc.deploys[wr.Name] = prog
}
op.Refresh(ctx, hc.makeOperationQueryRequest())
} else {
logger.Println(err)
}
case shared.Start:
var sr shared.StartProcessRequest
unmarshal(&sr, resp.Args)
if err := hc.startChildProcess(&sr, op); err != nil {
logger.Println(err)
}
case shared.Stop:
var sr shared.StopProcessRequest
unmarshal(&sr, resp.Args)
if err := hc.stopChildProcess(&sr, op); err != nil {
logger.Println(err)
}
case shared.Restart:
var rr shared.RestartProcessRequest
unmarshal(&rr, resp.Args)
if err := hc.restartChildProcess(&rr, op); err != nil {
logger.Println(err)
}
case shared.Upload:
var ur shared.UploadRequest
unmarshal(&ur, resp.Args)
if err := hc.uploadFiles(&ur); err != nil {
logger.Println(err)
}
}
}
}
}()
hc.shutdownFunc = cancel
hc.exitChan = exitChan
hc.ctx = ctx
hc.operationChan = operationChan
return hc, nil
}
func (hc *houstonClient) Start() {
// receive from stream
defer func() {
hc.wg.Wait()
for _, proc := range hc.childProcs {
if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
proc.cmd.Process.Signal(os.Kill)
proc.state = protos.ProcessState_Stopping
}
}
for _, proc := range hc.childProcs {
proc.cmd.Wait()
proc.cmd.Process.Release()
}
}()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
c := <-interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
hc.shutdownFunc()
}()
var client *grpc.ClientConn
reconnCount := 0
time.Sleep(time.Second)
for {
select {
case <-hc.ctx.Done():
return
default:
if client == nil {
if reconnCount == 0 {
logger.Println("grpc.DialContext :", hc.config.GrpcAddress)
}
reconnCount++
dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second)
client, _ = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cancelDial()
if client != nil {
reconnCount = 0
logger.Println("grpc.DialContext succeeded")
hc.clientChan <- client
}
}
if client != nil {
err := hc.checkOperation(client)
if err != nil {
logger.Println("hc.checkUpdate failed :", err)
client = nil
}
}
}
}
}
func (hc *houstonClient) Shutdown() {
hc.shutdownFunc()
}
func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
}
}()
op := protos.NewOperationClient(client)
cl, err := op.Query(hc.ctx, grpc.WaitForReady(true))
if err != nil {
return err
}
err = cl.Send(hc.makeOperationQueryRequest())
if err != nil {
cl.CloseSend()
return err
}
for {
update, err := cl.Recv()
if err != nil {
cl.CloseSend()
return err
}
hc.operationChan <- update
}
}
func (hc *houstonClient) SetReportMetrics(extra map[string]float32) {
atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra))
}