394 lines
8.1 KiB
Go
394 lines
8.1 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
|
|
"repositories.action2quare.com/ayo/houston/shared"
|
|
"repositories.action2quare.com/ayo/houston/shared/protos"
|
|
)
|
|
|
|
type opdef struct {
|
|
operation shared.Operation
|
|
args any
|
|
}
|
|
|
|
type ProcessSnapshot struct {
|
|
Name string `json:"name"`
|
|
Args []string `json:"args"`
|
|
Version string `json:"version"`
|
|
State protos.ProcessState `json:"state"`
|
|
Pid int32 `json:"pid"`
|
|
}
|
|
|
|
type hostWithChan struct {
|
|
Hostname string
|
|
Procs []*protos.ProcessDescription `json:"procs"`
|
|
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
|
|
opChan chan *opdef
|
|
}
|
|
|
|
func makeHostWithChan(desc *protos.OperationQueryRequest) *hostWithChan {
|
|
newdeploys := make(map[string][]*protos.VersionAndArgs)
|
|
for _, deploy := range desc.Deploys {
|
|
newdeploys[deploy.Name] = deploy.Versions
|
|
}
|
|
|
|
return &hostWithChan{
|
|
Hostname: desc.GetHostname(),
|
|
Procs: desc.Procs,
|
|
Deploys: newdeploys,
|
|
}
|
|
}
|
|
|
|
func (pc *hostWithChan) withOpChan(c chan *opdef) *hostWithChan {
|
|
pc.opChan = c
|
|
return pc
|
|
}
|
|
|
|
func (pc *hostWithChan) makeOpChan() *hostWithChan {
|
|
pc.opChan = make(chan *opdef, 1)
|
|
return pc
|
|
}
|
|
|
|
type hostPool struct {
|
|
sync.Mutex
|
|
hosts map[string]*hostWithChan
|
|
}
|
|
|
|
func (sp *hostPool) regist(desc *protos.OperationQueryRequest) (string, chan *opdef) {
|
|
sp.Lock()
|
|
defer sp.Unlock()
|
|
host := sp.hosts[desc.Hostname]
|
|
if host == nil {
|
|
host = makeHostWithChan(desc).makeOpChan()
|
|
} else {
|
|
host = makeHostWithChan(desc).withOpChan(host.opChan)
|
|
}
|
|
sp.hosts[desc.Hostname] = host
|
|
return desc.Hostname, host.opChan
|
|
}
|
|
|
|
func (sp *hostPool) refresh(desc *protos.OperationQueryRequest) {
|
|
sp.Lock()
|
|
defer sp.Unlock()
|
|
|
|
host := sp.hosts[desc.Hostname]
|
|
if host != nil {
|
|
host = makeHostWithChan(desc).withOpChan(host.opChan)
|
|
sp.hosts[desc.Hostname] = host
|
|
}
|
|
}
|
|
|
|
func (sp *hostPool) unregist(key string) {
|
|
sp.Lock()
|
|
defer sp.Unlock()
|
|
|
|
delete(sp.hosts, key)
|
|
}
|
|
|
|
type hostSnapshot struct {
|
|
Procs []ProcessSnapshot `json:"procs"`
|
|
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
|
|
}
|
|
|
|
func (sp *hostPool) allHosts() map[string]hostSnapshot {
|
|
sp.Lock()
|
|
defer sp.Unlock()
|
|
|
|
out := make(map[string]hostSnapshot)
|
|
for hn, v := range sp.hosts {
|
|
procs := make([]ProcessSnapshot, 0, len(v.Procs))
|
|
for _, p := range v.Procs {
|
|
procs = append(procs, ProcessSnapshot{
|
|
Name: p.Name,
|
|
Args: p.Args,
|
|
Version: p.Version,
|
|
State: p.State,
|
|
Pid: p.Pid,
|
|
})
|
|
}
|
|
out[hn] = hostSnapshot{
|
|
Procs: procs,
|
|
Deploys: v.Deploys,
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (sp *hostPool) query(filter func(*hostWithChan) bool) []*hostWithChan {
|
|
sp.Lock()
|
|
defer sp.Unlock()
|
|
|
|
var targets []*hostWithChan
|
|
for _, v := range sp.hosts {
|
|
if filter(v) {
|
|
targets = append(targets, v)
|
|
}
|
|
}
|
|
return targets
|
|
}
|
|
|
|
type operationServer struct {
|
|
protos.UnimplementedOperationServer
|
|
hp hostPool
|
|
}
|
|
|
|
func marshal(argval reflect.Value, output map[string]string) map[string]string {
|
|
if argval.Kind() == reflect.Pointer {
|
|
argval = argval.Elem()
|
|
}
|
|
|
|
for i := 0; i < argval.Type().NumField(); i++ {
|
|
if !argval.Type().Field(i).IsExported() {
|
|
continue
|
|
}
|
|
|
|
if argval.Type().Field(i).Anonymous {
|
|
marshal(argval.Field(i), output)
|
|
} else if argval.Field(i).CanInt() {
|
|
output[argval.Type().Field(i).Name] = fmt.Sprintf("%d", argval.Field(i).Int())
|
|
} else if argval.Field(i).Kind() == reflect.Array || argval.Field(i).Kind() == reflect.Slice {
|
|
var conv []string
|
|
for j := 0; j < argval.Field(i).Len(); j++ {
|
|
conv = append(conv, argval.Field(i).Index(j).String())
|
|
}
|
|
output[argval.Type().Field(i).Name] = strings.Join(conv, "\n")
|
|
} else {
|
|
output[argval.Type().Field(i).Name] = argval.Field(i).String()
|
|
}
|
|
}
|
|
return output
|
|
}
|
|
|
|
func (os *operationServer) Query(svr protos.Operation_QueryServer) error {
|
|
desc, err := svr.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
key, opChan := os.hp.regist(desc)
|
|
defer os.hp.unregist(key)
|
|
|
|
Outer:
|
|
for {
|
|
select {
|
|
case <-svr.Context().Done():
|
|
break Outer
|
|
|
|
case opdef := <-opChan:
|
|
svr.Send(&protos.OperationQueryResponse{
|
|
Operation: string(opdef.operation),
|
|
Args: marshal(reflect.ValueOf(opdef.args), make(map[string]string)),
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (os *operationServer) Refresh(ctx context.Context, desc *protos.OperationQueryRequest) (*protos.Empty, error) {
|
|
os.hp.refresh(desc)
|
|
return &protos.Empty{}, nil
|
|
}
|
|
|
|
func (os *operationServer) Deploy(d DeployRequest) {
|
|
var targets []*hostWithChan
|
|
if len(d.hostnames) > 0 {
|
|
// hostname에 배포
|
|
conv := make(map[string]bool)
|
|
for _, hn := range d.hostnames {
|
|
conv[hn] = true
|
|
}
|
|
targets = os.hp.query(func(p *hostWithChan) bool {
|
|
_, ok := conv[p.Hostname]
|
|
return ok
|
|
})
|
|
} else {
|
|
// d.process에 모두 배포
|
|
targets = os.hp.query(func(p *hostWithChan) bool {
|
|
for _, p := range p.Procs {
|
|
if p.Name == d.Name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
|
|
for _, t := range targets {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Deploy,
|
|
args: d,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) Withdraw(d WithdrawRequest) {
|
|
// 프로세스가 안돌고 있는 호스트에서도 회수해야 할 수 있다.
|
|
targets := os.hp.query(func(p *hostWithChan) bool {
|
|
return true
|
|
})
|
|
|
|
if len(d.hostnames) > 0 {
|
|
// hostname만 정지
|
|
var final []*hostWithChan
|
|
conv := make(map[string]bool)
|
|
for _, hn := range d.hostnames {
|
|
conv[hn] = true
|
|
}
|
|
|
|
for _, t := range targets {
|
|
if _, ok := conv[t.Hostname]; ok {
|
|
final = append(final, t)
|
|
}
|
|
}
|
|
targets = final
|
|
}
|
|
|
|
for _, t := range targets {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Withdraw,
|
|
args: d,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) StartProcess(d StartProcessRequest) {
|
|
targets := os.hp.query(func(p *hostWithChan) bool {
|
|
// 디플로이만 되어있어도 해당
|
|
_, ok := p.Deploys[d.Name]
|
|
return ok
|
|
})
|
|
|
|
if len(d.hostnames) > 0 {
|
|
// hostname만 업로드
|
|
var final []*hostWithChan
|
|
conv := make(map[string]bool)
|
|
for _, hn := range d.hostnames {
|
|
conv[hn] = true
|
|
}
|
|
|
|
for _, t := range targets {
|
|
if _, ok := conv[t.Hostname]; ok {
|
|
final = append(final, t)
|
|
}
|
|
}
|
|
targets = final
|
|
}
|
|
|
|
for _, t := range targets {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Start,
|
|
args: d,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) StopProcess(d StopProcessRequest) {
|
|
// d.process 모두 정지
|
|
targets := os.hp.query(func(p *hostWithChan) bool {
|
|
for _, p := range p.Procs {
|
|
if p.Name == d.Name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if len(d.hostnames) > 0 {
|
|
// hostname만 정지
|
|
var final []*hostWithChan
|
|
conv := make(map[string]bool)
|
|
for _, hn := range d.hostnames {
|
|
conv[hn] = true
|
|
}
|
|
|
|
for _, t := range targets {
|
|
if _, ok := conv[t.Hostname]; ok {
|
|
final = append(final, t)
|
|
}
|
|
}
|
|
targets = final
|
|
}
|
|
|
|
for _, t := range targets {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Stop,
|
|
args: d,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) RestartProcess(d RestartProcessRequest) {
|
|
targets := os.hp.query(func(p *hostWithChan) bool {
|
|
for _, p := range p.Procs {
|
|
if p.Name == d.Name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if len(d.hostnames) != 1 {
|
|
return
|
|
}
|
|
|
|
// hostname만 재시작
|
|
for _, t := range targets {
|
|
if t.Hostname == d.hostnames[0] {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Restart,
|
|
args: d,
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) Upload(d UploadRequest) {
|
|
targets := os.hp.query(func(p *hostWithChan) bool {
|
|
// 실행 중이 아니라 디플로이만 되어있어도 해당
|
|
_, ok := p.Deploys[d.Name]
|
|
return ok
|
|
})
|
|
|
|
if len(d.hostnames) > 0 {
|
|
// hostname만 업로드
|
|
var final []*hostWithChan
|
|
conv := make(map[string]bool)
|
|
for _, hn := range d.hostnames {
|
|
conv[hn] = true
|
|
}
|
|
|
|
for _, t := range targets {
|
|
if _, ok := conv[t.Hostname]; ok {
|
|
final = append(final, t)
|
|
}
|
|
}
|
|
targets = final
|
|
}
|
|
|
|
for _, t := range targets {
|
|
t.opChan <- &opdef{
|
|
operation: shared.Upload,
|
|
args: d,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (os *operationServer) Hosts() map[string]hostSnapshot {
|
|
return os.hp.allHosts()
|
|
}
|
|
|
|
func newOperationServer() *operationServer {
|
|
return &operationServer{
|
|
hp: hostPool{
|
|
hosts: map[string]*hostWithChan{},
|
|
},
|
|
}
|
|
}
|