diff --git a/client/operation.go b/client/operation.go index df9fcaf..85875cd 100644 --- a/client/operation.go +++ b/client/operation.go @@ -21,6 +21,7 @@ import ( "github.com/Knetic/govaluate" "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) @@ -335,7 +336,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { } } - stdReader := func(r io.ReadCloser, logfilePath string) { + stdReader := func(r io.ReadCloser, logfilePath string, verify func(buff []byte) bool) { defer func() { reco := recover() if reco != nil { @@ -358,8 +359,10 @@ func (hc *houstonClient) launch(meta *procmeta) error { break } - if len(buff) > 0 { - logChan <- buff + if verify(buff) { + if len(buff) > 0 { + logChan <- buff + } } } } @@ -372,7 +375,53 @@ func (hc *houstonClient) launch(meta *procmeta) error { } go func() { - stdReader(stdout, evalfile+".log") + metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) + defer metricExporter.Shutdown() + + var metricBuffer []byte + readingMetric := false + stdReader(stdout, evalfile+".log", func(buff []byte) bool { + if readingMetric { + metricBuffer = append(metricBuffer, buff...) + } else if buff[0] == metric.METRIC_HEAD_INLINE { + readingMetric = true + metricBuffer = append(metricBuffer, buff[1:]...) + } + + if readingMetric { + if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { + readingMetric = false + + metricBuffer = metricBuffer[:len(metricBuffer)-2] + if metricBuffer[0] == '{' { + var desc metric.MetricDescription + if err := json.Unmarshal(metricBuffer, &desc); err != nil { + logger.Println("unmarshal metric failed :", err, string(metricBuffer)) + return false + } + + if desc.ConstLabels == nil { + desc.ConstLabels = make(map[string]string) + } + + for k, v := range hc.config.ConstLabels { + desc.ConstLabels[k] = v + } + + desc.ConstLabels["job"] = meta.name + metricExporter.RegisterMetric(&desc) + } else { + key, val := metric.ReadMetricValue(metricBuffer) + metricExporter.UpdateMetric(key, val) + } + + metricBuffer = metricBuffer[:0] + } + + return false + } + return true + }) logger.Println("stdReader is terminated :", meta.name) if meta.isState(protos.ProcessState_Running) { @@ -386,7 +435,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { } }() - go stdReader(stderr, evalfile+".err") + go stdReader(stderr, evalfile+".err", func([]byte) bool { return true }) logger.Println("startChildProcess :", meta.cmd.Args)