동일 실행파일을 여러개 실행시켰을 때 구분해서 affinity 설정

This commit is contained in:
2023-11-30 14:17:27 +09:00
parent 023a2a5194
commit 1d3266bbaf
6 changed files with 154 additions and 33 deletions

View File

@ -94,20 +94,21 @@ func (pm *procmeta) setState(s protos.ProcessState) {
}
type houstonClient struct {
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
deploys map[string][]*protos.VersionAndArgs
shutdownFunc context.CancelFunc
ctx context.Context
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn
timestamp string
wg sync.WaitGroup
config clientConfig
version string
standalone bool
metricExporter metric.Exporter
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
deploys map[string][]*protos.VersionAndArgs
shutdownFunc context.CancelFunc
ctx context.Context
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn
timestamp string
wg sync.WaitGroup
config clientConfig
version string
standalone bool
metricExporter metric.Exporter
siblingProcIndex map[string]uint64
}
func unmarshal[T any](val *T, src map[string]string) {
@ -260,14 +261,15 @@ func NewClient(standalone bool) (HoustonClient, error) {
}
hc := &houstonClient{
config: clientConfig,
clientChan: make(chan *grpc.ClientConn),
extraMetrics: unsafe.Pointer(&map[string]float32{}),
deploys: deploys,
timestamp: exefi.ModTime().String(),
version: string(ver),
standalone: standalone,
metricExporter: metric.NewPrometheusExport(clientConfig.MetricNamespace),
config: clientConfig,
clientChan: make(chan *grpc.ClientConn),
extraMetrics: unsafe.Pointer(&map[string]float32{}),
deploys: deploys,
timestamp: exefi.ModTime().String(),
version: string(ver),
standalone: standalone,
metricExporter: metric.NewPrometheusExport(clientConfig.MetricNamespace),
siblingProcIndex: make(map[string]uint64),
}
ctx, cancel := context.WithCancel(context.Background())

26
client/client_linux.go Normal file
View File

@ -0,0 +1,26 @@
//go:build linux
package client
import (
"golang.org/x/sys/unix"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func set_affinity(pid int, cpu int) {
var cpuset unix.CPUSet
err := unix.SchedGetaffinity(pid, &cpuset)
if err != nil {
logger.Println("SchedGetaffinity failed :", err)
}
count := cpuset.Count()
cpuset.Zero()
cpuset.Set(cpu % count)
err = unix.SchedSetaffinity(pid, &cpuset)
if err != nil {
logger.Println("SchedSetaffinity failed :", err)
}
}

41
client/client_misc.go Normal file
View File

@ -0,0 +1,41 @@
//go:build !linux
package client
func set_affinity(pid int, cpu int) {
}
// package main
// import (
// "fmt"
// "syscall"
// "time"
// "unsafe"
// )
// func main() {
// var mask uintptr
// // Get the current CPU affinity of the process
// if _, _, err := syscall.RawSyscall(syscall.SYS_SCHED_GETAFFINITY, 0, uintptr(unsafe.Sizeof(mask)), uintptr(unsafe.Pointer(&mask))); err != 0 {
// fmt.Println("Failed to get CPU affinity:", err)
// return
// }
// fmt.Println("Current CPU affinity:", mask)
// // Set the new CPU affinity
// mask = 3
// if _, _, err := syscall.RawSyscall(syscall.SYS_SCHED_SETAFFINITY, 0, uintptr(unsafe.Sizeof(mask)), uintptr(unsafe.Pointer(&mask))); err != 0 {
// fmt.Println("Failed to set CPU affinity:", err)
// return
// }
// fmt.Println("New CPU affinity:", mask)
// // some code
// for {
// println("Hello, World!")
// time.Sleep(1 * time.Second)
// }
// }

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
"os/exec"
@ -179,7 +180,7 @@ func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *
return nil
}
func makeLogFilePrefix(meta *procmeta) string {
func makeLogFilePrefix(meta *procmeta, index int) string {
now := time.Now().UTC()
ext := path.Ext(meta.args[0])
nameonly := path.Base(meta.args[0])
@ -187,7 +188,11 @@ func makeLogFilePrefix(meta *procmeta) string {
nameonly = nameonly[:len(nameonly)-len(ext)]
}
ts := now.Format("2006-01-02T15-04-05")
return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
if index == 0 {
return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts))
}
return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%d_%s", nameonly, index, ts))
}
func (hc *houstonClient) launch(meta *procmeta) error {
@ -201,7 +206,7 @@ func (hc *houstonClient) launch(meta *procmeta) error {
return err
}
stdReader := func(childProcName string, r io.ReadCloser) {
stdReader := func(childProcName string, r io.ReadCloser, index int) {
defer func() {
reco := recover()
if reco != nil {
@ -209,13 +214,23 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}
}()
defer func() {
overflow := index / 64
offset := index % 64
key := fmt.Sprintf("%s-%d", meta.args[0], overflow)
runningFlags := hc.siblingProcIndex[key]
mask := uint64(1 << offset)
runningFlags = runningFlags ^ mask
hc.siblingProcIndex[key] = runningFlags
}()
defer r.Close()
reader := bufio.NewReader(r)
thisFileSize := 0
logFileIndex := 0
logFileNamePrefix := makeLogFilePrefix(meta)
logFileNamePrefix := makeLogFilePrefix(meta, index)
logFileName := fmt.Sprintf("%s_%d.log", logFileNamePrefix, logFileIndex)
targetFile, err := os.Create(logFileName)
if err != nil {
@ -224,10 +239,15 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}
exef, _ := os.Executable()
linkePath := path.Join(path.Dir(exef), path.Dir(logFileName), meta.name+".log")
var linkPath string
if index == 0 {
linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), meta.name+".log")
} else {
linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), fmt.Sprintf("%s_%d.log", meta.name, index))
}
os.Remove(linkePath)
os.Symlink(path.Base(targetFile.Name()), linkePath)
os.Remove(linkPath)
os.Symlink(path.Base(targetFile.Name()), linkPath)
defer func() {
if targetFile != nil {
@ -307,19 +327,41 @@ func (hc *houstonClient) launch(meta *procmeta) error {
} else {
targetFile.Close()
targetFile = nextTargetFile
os.Remove(linkePath)
os.Symlink(path.Base(targetFile.Name()), linkePath)
os.Remove(linkPath)
os.Symlink(path.Base(targetFile.Name()), linkPath)
thisFileSize = 0
}
}
}
}
go stdReader(meta.name, stdout)
index := 0
for overflow := 0; ; overflow++ {
key := fmt.Sprintf("%s-%d", meta.args[0], overflow)
runningFlags := hc.siblingProcIndex[key]
if runningFlags == math.MaxUint64 {
index += 64
} else {
for si := 0; si < 64; si++ {
mask := uint64(1 << si)
if runningFlags&mask == 0 {
index += si
runningFlags |= mask
break
}
}
hc.siblingProcIndex[key] = runningFlags
break
}
}
go stdReader(meta.name, stdout, index)
logger.Println("startChildProcess :", meta.cmd.Args)
meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("HOUSTON_SIBLIING_INDEX=%d", index))
err = meta.cmd.Start()
if err == nil {
set_affinity(meta.cmd.Process.Pid, index)
meta.setState(protos.ProcessState_Running)
}