Files
houston/server/operation.go

394 lines
8.1 KiB
Go
Raw Normal View History

2023-05-21 23:37:54 +09:00
package server
import (
"context"
"fmt"
"reflect"
2023-06-27 10:53:14 +09:00
"strings"
2023-05-21 23:37:54 +09:00
"sync"
2023-05-22 02:13:03 +09:00
"repositories.action2quare.com/ayo/houston/shared"
"repositories.action2quare.com/ayo/houston/shared/protos"
2023-05-21 23:37:54 +09:00
)
type opdef struct {
operation shared.Operation
args any
}
type ProcessSnapshot struct {
2023-06-14 15:30:01 +09:00
Name string `json:"name"`
Args []string `json:"args"`
Version string `json:"version"`
State protos.ProcessState `json:"state"`
Pid int32 `json:"pid"`
2023-05-21 23:37:54 +09:00
}
type hostWithChan struct {
Hostname string
2023-06-14 15:30:01 +09:00
Procs []*protos.ProcessDescription `json:"procs"`
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
2023-05-21 23:37:54 +09:00
opChan chan *opdef
}
func makeHostWithChan(desc *protos.OperationQueryRequest) *hostWithChan {
newdeploys := make(map[string][]*protos.VersionAndArgs)
for _, deploy := range desc.Deploys {
newdeploys[deploy.Name] = deploy.Versions
}
return &hostWithChan{
Hostname: desc.GetHostname(),
Procs: desc.Procs,
Deploys: newdeploys,
}
}
func (pc *hostWithChan) withOpChan(c chan *opdef) *hostWithChan {
pc.opChan = c
return pc
}
func (pc *hostWithChan) makeOpChan() *hostWithChan {
pc.opChan = make(chan *opdef, 1)
return pc
}
type hostPool struct {
sync.Mutex
hosts map[string]*hostWithChan
}
func (sp *hostPool) regist(desc *protos.OperationQueryRequest) (string, chan *opdef) {
sp.Lock()
defer sp.Unlock()
host := sp.hosts[desc.Hostname]
if host == nil {
host = makeHostWithChan(desc).makeOpChan()
} else {
host = makeHostWithChan(desc).withOpChan(host.opChan)
}
sp.hosts[desc.Hostname] = host
return desc.Hostname, host.opChan
}
func (sp *hostPool) refresh(desc *protos.OperationQueryRequest) {
sp.Lock()
defer sp.Unlock()
host := sp.hosts[desc.Hostname]
if host != nil {
host = makeHostWithChan(desc).withOpChan(host.opChan)
sp.hosts[desc.Hostname] = host
}
}
func (sp *hostPool) unregist(key string) {
sp.Lock()
defer sp.Unlock()
delete(sp.hosts, key)
}
type hostSnapshot struct {
2023-06-14 15:33:44 +09:00
Procs []ProcessSnapshot `json:"procs"`
Deploys map[string][]*protos.VersionAndArgs `json:"deploys"`
2023-05-21 23:37:54 +09:00
}
func (sp *hostPool) allHosts() map[string]hostSnapshot {
sp.Lock()
defer sp.Unlock()
out := make(map[string]hostSnapshot)
for hn, v := range sp.hosts {
procs := make([]ProcessSnapshot, 0, len(v.Procs))
for _, p := range v.Procs {
procs = append(procs, ProcessSnapshot{
2023-06-14 15:30:01 +09:00
Name: p.Name,
Args: p.Args,
Version: p.Version,
State: p.State,
Pid: p.Pid,
2023-05-21 23:37:54 +09:00
})
}
out[hn] = hostSnapshot{
Procs: procs,
Deploys: v.Deploys,
}
}
return out
}
func (sp *hostPool) query(filter func(*hostWithChan) bool) []*hostWithChan {
sp.Lock()
defer sp.Unlock()
var targets []*hostWithChan
for _, v := range sp.hosts {
if filter(v) {
targets = append(targets, v)
}
}
return targets
}
type operationServer struct {
protos.UnimplementedOperationServer
hp hostPool
}
func marshal(argval reflect.Value, output map[string]string) map[string]string {
if argval.Kind() == reflect.Pointer {
argval = argval.Elem()
}
for i := 0; i < argval.Type().NumField(); i++ {
if !argval.Type().Field(i).IsExported() {
continue
}
if argval.Type().Field(i).Anonymous {
marshal(argval.Field(i), output)
} else if argval.Field(i).CanInt() {
output[argval.Type().Field(i).Name] = fmt.Sprintf("%d", argval.Field(i).Int())
2023-06-27 10:53:14 +09:00
} else if argval.Field(i).Kind() == reflect.Array || argval.Field(i).Kind() == reflect.Slice {
var conv []string
2023-06-27 11:02:19 +09:00
for j := 0; j < argval.Field(i).Len(); j++ {
conv = append(conv, argval.Field(i).Index(j).String())
2023-06-27 10:53:14 +09:00
}
output[argval.Type().Field(i).Name] = strings.Join(conv, "\n")
2023-05-21 23:37:54 +09:00
} else {
output[argval.Type().Field(i).Name] = argval.Field(i).String()
}
}
return output
}
func (os *operationServer) Query(svr protos.Operation_QueryServer) error {
desc, err := svr.Recv()
if err != nil {
return err
}
key, opChan := os.hp.regist(desc)
defer os.hp.unregist(key)
Outer:
for {
select {
case <-svr.Context().Done():
break Outer
case opdef := <-opChan:
svr.Send(&protos.OperationQueryResponse{
Operation: string(opdef.operation),
Args: marshal(reflect.ValueOf(opdef.args), make(map[string]string)),
})
}
}
return nil
}
func (os *operationServer) Refresh(ctx context.Context, desc *protos.OperationQueryRequest) (*protos.Empty, error) {
os.hp.refresh(desc)
return &protos.Empty{}, nil
}
func (os *operationServer) Deploy(d DeployRequest) {
var targets []*hostWithChan
if len(d.hostnames) > 0 {
// hostname에 배포
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
targets = os.hp.query(func(p *hostWithChan) bool {
_, ok := conv[p.Hostname]
return ok
})
} else {
// d.process에 모두 배포
targets = os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Deploy,
args: d,
}
}
}
func (os *operationServer) Withdraw(d WithdrawRequest) {
// 프로세스가 안돌고 있는 호스트에서도 회수해야 할 수 있다.
targets := os.hp.query(func(p *hostWithChan) bool {
return true
})
if len(d.hostnames) > 0 {
// hostname만 정지
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Withdraw,
args: d,
}
}
}
func (os *operationServer) StartProcess(d StartProcessRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
// 디플로이만 되어있어도 해당
_, ok := p.Deploys[d.Name]
return ok
})
if len(d.hostnames) > 0 {
// hostname만 업로드
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Start,
args: d,
}
}
}
func (os *operationServer) StopProcess(d StopProcessRequest) {
// d.process 모두 정지
targets := os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
if len(d.hostnames) > 0 {
// hostname만 정지
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Stop,
args: d,
}
}
}
func (os *operationServer) RestartProcess(d RestartProcessRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
for _, p := range p.Procs {
if p.Name == d.Name {
return true
}
}
return false
})
2023-06-26 11:26:57 +09:00
if len(d.hostnames) != 1 {
return
2023-05-21 23:37:54 +09:00
}
2023-06-26 11:26:57 +09:00
// hostname만 재시작
2023-05-21 23:37:54 +09:00
for _, t := range targets {
2023-06-26 11:26:57 +09:00
if t.Hostname == d.hostnames[0] {
t.opChan <- &opdef{
operation: shared.Restart,
args: d,
}
return
2023-05-21 23:37:54 +09:00
}
}
}
func (os *operationServer) Upload(d UploadRequest) {
targets := os.hp.query(func(p *hostWithChan) bool {
// 실행 중이 아니라 디플로이만 되어있어도 해당
_, ok := p.Deploys[d.Name]
return ok
})
if len(d.hostnames) > 0 {
// hostname만 업로드
var final []*hostWithChan
conv := make(map[string]bool)
for _, hn := range d.hostnames {
conv[hn] = true
}
for _, t := range targets {
if _, ok := conv[t.Hostname]; ok {
final = append(final, t)
}
}
targets = final
}
for _, t := range targets {
t.opChan <- &opdef{
operation: shared.Upload,
args: d,
}
}
}
func (os *operationServer) Hosts() map[string]hostSnapshot {
return os.hp.allHosts()
}
func newOperationServer() *operationServer {
return &operationServer{
hp: hostPool{
hosts: map[string]*hostWithChan{},
},
}
}