리플레이 저장

- stdout pipe를 통해 houston에 리플레이 upload 요청 기능 추가
This commit is contained in:
2025-01-09 16:21:29 +09:00
parent 3ab055008c
commit 5f68795185
6 changed files with 258 additions and 4 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"os"
"os/exec"
"os/signal"
@ -21,6 +22,7 @@ import (
"time"
"unsafe"
"github.com/djherbis/times"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
@ -32,9 +34,9 @@ import (
)
type runcommand struct {
Exec string `json:"exec"`
Args []string `json:"args"`
Version string `json:"version"`
Exec string `json:"exec"`
Args []string `json:"args"`
Version string `json:"version"`
}
type clientConfig struct {
@ -95,6 +97,12 @@ func (pm *procmeta) setState(s protos.ProcessState) {
atomic.StoreInt32(&pm.state, int32(s))
}
type uploadRequest struct {
filePath string
name string
version string
}
type houstonClient struct {
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
@ -104,6 +112,7 @@ type houstonClient struct {
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn
uploadChan chan uploadRequest
timestamp string
wg sync.WaitGroup
config clientConfig
@ -286,6 +295,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
timestamp: exefi.ModTime().String(),
version: string(ver),
standalone: standalone,
uploadChan: make(chan uploadRequest, 100),
siblingProcIndex: make(map[string]uint64),
}
@ -493,6 +503,59 @@ func NewClient(standalone bool) (HoustonClient, error) {
return hc, nil
}
func uploadSafe(url, filePath, name, version string) error {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
t, err := times.Stat(filePath)
if err != nil {
return err
}
file, err := os.Open(filePath)
if err != nil {
return err
}
if file == nil {
return errors.New("upload file is missing :" + filePath)
}
defer file.Close()
// hc.config.HttpAddress+"/upload",
httpreq, err := http.NewRequest("POST", url, file)
if err != nil {
return err
}
hn, _ := os.Hostname()
// createTime := file.
httpreq.Header.Set("Houston-Service-Name", name)
httpreq.Header.Set("Houston-Service-Version", version)
httpreq.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(filePath))
httpreq.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(httpreq)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("upload file failed. response code : %s, %d", filePath, resp.StatusCode)
}
if err := os.Remove(filePath); err != nil {
return err
}
return nil
}
func (hc *houstonClient) Start() {
// receive from stream
defer func() {
@ -509,6 +572,20 @@ func (hc *houstonClient) Start() {
proc.cmd.Wait()
proc.cmd.Process.Release()
}
close(hc.uploadChan)
}()
go func() {
// upload 고루틴
url := hc.config.HttpAddress + "/upload"
for req := range hc.uploadChan {
logger.Println("uploadSafe :", req)
err := uploadSafe(url, req.filePath, req.name, req.version)
if err != nil {
logger.Println("uploadSafe return err :", err)
}
}
}()
interrupt := make(chan os.Signal, 1)
@ -553,8 +630,8 @@ func (hc *houstonClient) Start() {
logger.Println("autorun success :", sr)
}
}
}
}
}
for {
select {

135
client/houston_pipe_req.go Normal file
View File

@ -0,0 +1,135 @@
package client
import (
"bytes"
"crypto/aes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"errors"
"os"
"strings"
)
var pipeReqPrefix = []byte("houston_pipe_req")
var pipeReqHandle = map[string]func(hc *houstonClient, meta *procmeta, param string) error{
"upload": handleStdOutUploadRequest,
}
func HandleHoustonPipeReq(hc *houstonClient, meta *procmeta, buff []byte) (pipeRequest bool, retErr error) {
if !bytes.HasPrefix(buff, pipeReqPrefix) {
return false, nil // Not a pipe request
}
command, param, err := parsePipeReq(buff)
if err != nil {
return true, err
}
if handler, ok := pipeReqHandle[command]; ok {
if err := handler(hc, meta, param); err != nil {
return true, err
}
}
return true, nil
}
var pipeReqDelimeter = []byte("|")
var pipeReqKey = []byte{
0x77, 0x77, 0x71, 0x3c, 0x75, 0x64, 0x22, 0x54,
0x3e, 0x41, 0x27, 0x68, 0x39, 0x6e, 0x23, 0x49,
0x5f, 0x66, 0x71, 0x50, 0x32, 0x68, 0x53, 0x43,
0x72, 0x2f, 0x62, 0x39, 0x6e, 0x22, 0x27, 0x2d,
}
var errInvalidRequestBuff = errors.New("parsePipeReq got invalid request format")
func parsePipeReq(buff []byte) (command, param string, err error) {
//buff == "houston_pipe_req|EncryptString\r\n"
parts := bytes.Split(buff, pipeReqDelimeter)
if len(parts) != 2 {
return "", "", errInvalidRequestBuff
}
//Decrypt
decryptBuff, err := decryptPipeReq(parts[1])
if err != nil {
return "", "", err
}
//buff == houston_pipe_req|command|example_paramstring|MD5
//decryptBuff == command|example_paramstring|MD5
parts = bytes.Split(decryptBuff, pipeReqDelimeter)
if len(parts) != 3 {
return "", "", errInvalidRequestBuff
}
command = string(parts[0])
param = string(parts[1])
receivedHash := string(parts[2])
if err := validatePipeReq(command, param, receivedHash); err != nil {
return "", "", err
}
return command, param, nil
}
func decryptPipeReq(encordBuff []byte) ([]byte, error) {
decordBuff, err := base64.StdEncoding.DecodeString(string(encordBuff))
if err != nil {
return nil, err
}
if len(decordBuff)%aes.BlockSize != 0 {
return nil, errors.New("parsePipeReq got encrypted data which is not a multiple of the block size")
}
aesBlock, err := aes.NewCipher(pipeReqKey)
if err != nil {
return nil, err
}
decryptBuff := make([]byte, len(decordBuff))
for start := 0; start < len(decordBuff); start += aes.BlockSize {
aesBlock.Decrypt(decryptBuff[start:start+aes.BlockSize], decordBuff[start:start+aes.BlockSize])
}
return decryptBuff, nil
}
var errValidatePipeFail = errors.New("validatePipeReq fail to check validation of buff")
func validatePipeReq(command, param, receivedHash string) error {
//Decord receivedHash
receiveHashLen := md5.Size * 2
if len(receivedHash) < receiveHashLen {
return errValidatePipeFail
}
decordHash, err := hex.DecodeString(receivedHash[0:receiveHashLen])
if err != nil {
return err
}
//Generate md5 from command and param
var reqBuilder strings.Builder
reqBuilder.WriteString(command)
reqBuilder.Write(pipeReqDelimeter)
reqBuilder.WriteString(param)
buffHashWriter := md5.New()
buffHashWriter.Write([]byte(reqBuilder.String()))
buffHash := buffHashWriter.Sum(nil)
if !bytes.Equal(decordHash, buffHash) {
return errValidatePipeFail
}
return nil
}
func handleStdOutUploadRequest(hc *houstonClient, meta *procmeta, param string) error {
uploadFullPath := param
if _, err := os.Stat(uploadFullPath); err != nil {
return err
} else {
hc.uploadToAppendFile(uploadFullPath, meta.name, meta.version)
}
return nil
}

View File

@ -44,6 +44,14 @@ func lastExecutionArgs(verpath string) []string {
return out
}
func (hc *houstonClient) uploadToAppendFile(filePath string, name string, version string) {
hc.uploadChan <- uploadRequest{
filePath: filePath,
name: name,
version: version,
}
}
var errUploadZipLogFailed = errors.New("not ok")
func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version string) error {
@ -460,6 +468,8 @@ func (hc *houstonClient) launch(meta *procmeta) error {
}
continue
} else if ok, err := HandleHoustonPipeReq(hc, meta, buff); ok && err != nil {
logger.Println("HandleHoustonStdoutReq failed :", err)
}
logWriter(buff)