Files
houston/server/operation.go
2024-02-15 17:32:24 +09:00

527 lines
11 KiB
Go

package server
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"reflect"
"strings"
"sync"
"time"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/houston/shared"
"repositories.action2quare.com/ayo/houston/shared/protos"
)
type opdef struct {
operation shared.Operation
args any
}
type ProcessSnapshot struct {
Name string `json:"name"`
Args []string `json:"args"`
Version string `json:"version"`
State protos.ProcessState `json:"state"`
Pid int32 `json:"pid"`
}
type hostWithChan struct {
Hostname string
PrivateIp string
PublicIp string
Procs []*protos.ProcessDescription `json:"procs"`
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
opChan chan *opdef
}
func makeHostWithChan(desc *protos.OperationQueryRequest) *hostWithChan {
newdeploys := make(map[string][]*protos.VersionAndArgs)
for _, deploy := range desc.Deploys {
newdeploys[deploy.Name] = deploy.Versions
}
return &hostWithChan{
PrivateIp: desc.PrivateIp,
PublicIp: desc.PublicIp,
Hostname: desc.GetHostname(),
Procs: desc.Procs,
Deploys: newdeploys,
}
}
func (pc *hostWithChan) withOpChan(c chan *opdef) *hostWithChan {
pc.opChan = c
return pc
}
func (pc *hostWithChan) makeOpChan() *hostWithChan {
pc.opChan = make(chan *opdef, 1)
return pc
}
type hostPool struct {
sync.Mutex
hosts map[string]*hostWithChan
exportChan chan string
}
type deployingProgress struct {
*protos.DeployingProgress
Timestamp int64
}
type deployingBoard struct {
sync.Mutex
progs []deployingProgress
}
func (db *deployingBoard) clone() (out []deployingProgress) {
db.Lock()
defer db.Unlock()
out = make([]deployingProgress, len(db.progs))
copy(out, db.progs)
return
}
func (sp *hostPool) regist(desc *protos.OperationQueryRequest) (string, chan *opdef) {
sp.Lock()
defer sp.Unlock()
host := sp.hosts[desc.Hostname]
if host == nil {
host = makeHostWithChan(desc).makeOpChan()
} else {
host = makeHostWithChan(desc).withOpChan(host.opChan)
}
logger.Println("houston agent registered :", desc.Hostname, desc.PrivateIp, desc.PublicIp)
go func(prvip string, pubip string) {
if len(prvip) > 0 {
address := net.JoinHostPort(prvip, "9100")
if conn, _ := net.DialTimeout("tcp", address, 3*time.Second); conn != nil {
conn.Close()
sp.exportChan <- "+" + address
return
}
}
if len(pubip) > 0 {
address := net.JoinHostPort(pubip, "9100")
if conn, _ := net.DialTimeout("tcp", address, 3*time.Second); conn != nil {
conn.Close()
sp.exportChan <- "+" + address
return
}
}
}(desc.PrivateIp, desc.PublicIp)
sp.hosts[desc.Hostname] = host
return desc.Hostname, host.opChan
}
func (sp *hostPool) refresh(desc *protos.OperationQueryRequest) {
sp.Lock()
defer sp.Unlock()
host := sp.hosts[desc.Hostname]
if host != nil {
host = makeHostWithChan(desc).withOpChan(host.opChan)
sp.hosts[desc.Hostname] = host
}
}
func (sp *hostPool) unregist(key string) {
sp.Lock()
defer sp.Unlock()
host := sp.hosts[key]
if host != nil {
sp.exportChan <- "-" + host.PublicIp
sp.exportChan <- "-" + host.PrivateIp
}
delete(sp.hosts, key)
}
type hostSnapshot struct {
Procs []ProcessSnapshot `json:"procs"`
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
}
func (sp *hostPool) allHosts() map[string]hostSnapshot {
sp.Lock()
defer sp.Unlock()
out := make(map[string]hostSnapshot)
for hn, v := range sp.hosts {
procs := make([]ProcessSnapshot, 0, len(v.Procs))
for _, p := range v.Procs {
procs = append(procs, ProcessSnapshot{
Name: p.Name,
Args: p.Args,
Version: p.Version,
State: p.State,
Pid: p.Pid,
})
}
out[hn] = hostSnapshot{
Procs: procs,
Deploys: v.Deploys,
}
}
return out
}
func (sp *hostPool) query(filter func(*hostWithChan) bool) []*hostWithChan {
sp.Lock()
defer sp.Unlock()
var targets []*hostWithChan
for _, v := range sp.hosts {
if filter(v) {
targets = append(targets, v)
}
}
return targets
}
type operationServer struct {
protos.UnimplementedOperationServer
hp hostPool
db deployingBoard
}
func marshal(argval reflect.Value, output map[string]string) map[string]string {
if argval.Kind() == reflect.Pointer {
argval = argval.Elem()
}
for i := 0; i < argval.Type().NumField(); i++ {
if !argval.Type().Field(i).IsExported() {
continue
}
if argval.Type().Field(i).Anonymous {
marshal(argval.Field(i), output)
} else if argval.Field(i).CanInt() {
output[argval.Type().Field(i).Name] = fmt.Sprintf("%d", argval.Field(i).Int())
} else if argval.Field(i).Kind() == reflect.Array || argval.Field(i).Kind() == reflect.Slice {
var conv []string
for j := 0; j < argval.Field(i).Len(); j++ {
conv = append(conv, argval.Field(i).Index(j).String())
}
output[argval.Type().Field(i).Name] = strings.Join(conv, "\n")
} else {
output[argval.Type().Field(i).Name] = argval.Field(i).String()
}
}
return output
}
func (os *operationServer) Query(svr protos.Operation_QueryServer) error {
desc, err := svr.Recv()
if err != nil {
return err
}
hostname := desc.Hostname
key, opChan := os.hp.regist(desc)
defer func() {
logger.Println("operationServer.Query : houston client unregistered ", hostname)
os.hp.unregist(key)
}()
logger.Println("operationServer.Query : houston client registered ", hostname)
Outer:
for {
select {
case <-svr.Context().Done():
break Outer
case opdef := <-opChan:
svr.Send(&protos.OperationQueryResponse{
Operation: string(opdef.operation),
Args: marshal(reflect.ValueOf(opdef.args), make(map[string]string)),
})
}
}
return nil
}
func (os *operationServer) ReportDeployingProgress(ctx context.Context, dp *protos.DeployingProgress) (*protos.Empty, error) {
os.db.Lock()
defer os.db.Unlock()
for i, p := range os.db.progs {
if p.Hostname == dp.Hostname && p.Name == dp.Name && p.Version == dp.Version {
os.db.progs[i].DeployingProgress = dp
os.db.progs[i].Timestamp = time.Now().UTC().Unix()
return &protos.Empty{}, nil
}
}
os.db.progs = append(os.db.progs, deployingProgress{
DeployingProgress: dp,
Timestamp: time.Now().UTC().Unix(),
})
return &protos.Empty{}, nil
}
func (os *operationServer) Refresh(ctx context.Context, desc *protos.OperationQueryRequest) (*protos.Empty, error) {
os.hp.refresh(desc)
return &protos.Empty{}, nil
}
func (os *operationServer) Deploy(d DeployRequest) {
var targets []*hostWithChan
if len(d.hostnames) > 0 {
// hostname에 배포
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
targets = os.hp.query(func(p *hostWithChan) bool {
_, ok := conv[p.Hostname]
return ok
})
} else {
// d.process에 모두 배포
targets = os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
}
dps := make([]deployingProgress, len(targets))
now := time.Now().UTC().Unix()
for i, t := range targets {
dps[i] = deployingProgress{
DeployingProgress: &protos.DeployingProgress{
Hostname: t.Hostname,
Name: d.Name,
Version: d.Version,
State: "prepare",
},
Timestamp: now,
}
t.opChan <- &opdef{
operation: shared.Deploy,
args: d,
}
}
os.db.Lock()
defer os.db.Unlock()
os.db.progs = dps
}
func (os *operationServer) Withdraw(d WithdrawRequest) {
// 프로세스가 안돌고 있는 호스트에서도 회수해야 할 수 있다.
targets := os.hp.query(func(p *hostWithChan) bool {
return true
})
if len(d.hostnames) > 0 {
// hostname만 정지
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Withdraw,
args: d,
}
}
}
func (os *operationServer) StartProcess(d StartProcessRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
// 디플로이만 되어있어도 해당
_, ok := p.Deploys[d.Name]
return ok
})
if len(d.hostnames) > 0 {
// hostname만 업로드
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Start,
args: d,
}
}
}
func (os *operationServer) StopProcess(d StopProcessRequest) {
// d.process 모두 정지
targets := os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
if len(d.hostnames) > 0 {
// hostname만 정지
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Stop,
args: d,
}
}
}
func (os *operationServer) RestartProcess(d RestartProcessRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
if len(d.hostnames) != 1 {
return
}
// hostname만 재시작
for _, t := range targets {
if t.Hostname == d.hostnames[0] {
t.opChan <- &opdef{
operation: shared.Restart,
args: d,
}
return
}
}
}
func (os *operationServer) Upload(d UploadRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
// 실행 중이 아니라 디플로이만 되어있어도 해당
_, ok := p.Deploys[d.Name]
return ok
})
if len(d.hostnames) > 0 {
// hostname만 업로드
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Upload,
args: d,
}
}
}
func (os *operationServer) Hosts() map[string]hostSnapshot {
return os.hp.allHosts()
}
func (os *operationServer) DeplyingProgress() []deployingProgress {
return os.db.clone()
}
func targetExportLoop(in chan string) {
all := make(map[string]bool)
for addr := range in {
logger.Println("targetExportLoop :", addr)
if addr[0] == '+' {
all[addr[1:]] = true
} else if addr[0] == '-' {
delete(all, addr[1:])
}
list := make([]string, 0, len(all))
for k := range all {
list = append(list, k)
}
output := []map[string]any{{"targets": list}}
if file, err := os.Create("prometheus_targets.json"); err == nil {
enc := json.NewEncoder(file)
enc.Encode(output)
file.Close()
}
}
}
func newOperationServer() *operationServer {
exportChan := make(chan string)
go targetExportLoop(exportChan)
return &operationServer{
hp: hostPool{
hosts: map[string]*hostWithChan{},
exportChan: exportChan,
},
}
}