diff --git a/client/client.go b/client/client.go index d78479c..6602b74 100644 --- a/client/client.go +++ b/client/client.go @@ -17,7 +17,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "syscall" "time" "unsafe" @@ -64,7 +63,6 @@ func loadClientConfig() (clientConfig, error) { } type HoustonClient interface { - SetReportMetrics(map[string]float32) Shutdown() Start() } @@ -525,7 +523,3 @@ func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error { hc.operationChan <- update } } - -func (hc *houstonClient) SetReportMetrics(extra map[string]float32) { - atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra)) -} diff --git a/client/custom_exporter.go b/client/custom_exporter.go new file mode 100644 index 0000000..911fc51 --- /dev/null +++ b/client/custom_exporter.go @@ -0,0 +1,83 @@ +package client + +import ( + "math" + _ "net/http/pprof" + "sync/atomic" + "unsafe" + + "github.com/prometheus/client_golang/prometheus" + "repositories.action2quare.com/ayo/gocommon/metric" +) + +type metricDesc struct { + *prometheus.Desc + val *uint64 + valueType prometheus.ValueType +} + +type exporterForPrometheus struct { + metricPtr unsafe.Pointer // []metricDesc +} + +func newExporterForPrometheus() *exporterForPrometheus { + return &exporterForPrometheus{ + metricPtr: unsafe.Pointer(new([]metricDesc)), + } +} + +type metricValueAccessor struct { + ptr *uint64 +} + +func (va metricValueAccessor) set(val float64) { + atomic.StoreUint64(va.ptr, math.Float64bits(val)) +} + +func convertValueType(in metric.MetricType) prometheus.ValueType { + switch in { + case metric.MetricCounter: + return prometheus.CounterValue + + case metric.MetricGuage: + return prometheus.GaugeValue + } + + return prometheus.UntypedValue +} + +func (e *exporterForPrometheus) registMetric(namespace string, desc metric.MetricDescription) metricValueAccessor { + ptr := atomic.LoadPointer(&e.metricPtr) + container := *(*[]metricDesc)(ptr) + + newcont := make([]metricDesc, len(container)+1) + copy(newcont, container) + + newval := new(uint64) + newcont[len(container)] = metricDesc{ + Desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, "", desc.Name), desc.Help, nil, desc.ConstLabels), + val: newval, + valueType: convertValueType(desc.Type), + } + + atomic.StorePointer(&e.metricPtr, unsafe.Pointer(&newcont)) + return metricValueAccessor{ + ptr: newval, + } +} + +func (e *exporterForPrometheus) Describe(ch chan<- *prometheus.Desc) { + ptr := atomic.LoadPointer(&e.metricPtr) + container := *(*[]metricDesc)(ptr) + for _, v := range container { + ch <- v.Desc + } +} + +func (e *exporterForPrometheus) Collect(ch chan<- prometheus.Metric) { + ptr := atomic.LoadPointer(&e.metricPtr) + container := *(*[]metricDesc)(ptr) + for _, v := range container { + ch <- prometheus.MustNewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.val))) + } +} diff --git a/client/deploy.go b/client/deploy.go index 33432fd..7689622 100644 --- a/client/deploy.go +++ b/client/deploy.go @@ -242,7 +242,7 @@ func (hc *houstonClient) makeDownloadUrl(rel string) string { return out } -func copy(src, dst string) error { +func copyfile(src, dst string) error { fi, err := os.Stat(src) if err != nil { return err @@ -311,7 +311,7 @@ func (hc *houstonClient) prepareUpdateSelf(req *shared.DeployRequest) (srcdir st selfname, _ := os.Executable() srcreplacer := path.Join(path.Dir(fname), "replacer") + path.Ext(selfname) replacer = "./" + filepath.ToSlash("replacer"+path.Ext(selfname)) - err = copy(srcreplacer, replacer) + err = copyfile(srcreplacer, replacer) if err == nil { err = os.Chmod(replacer, 0775) } diff --git a/client/operation.go b/client/operation.go index 21c482f..6bda57a 100644 --- a/client/operation.go +++ b/client/operation.go @@ -4,10 +4,12 @@ import ( "archive/zip" "bufio" "context" + "encoding/binary" "encoding/json" "errors" "fmt" "io" + "math" "net/http" "os" "os/exec" @@ -17,7 +19,9 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus" "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) @@ -138,30 +142,6 @@ func zipLogFiles(storageRoot string, req *shared.UploadRequest, start, except st } return f.Name(), matches, nil - // defer func() { - // tempname := f.Name() - // f.Close() - - // resp, _ := http.Post(req.Url, "application/zip", f) - // if resp != nil && resp.Body != nil { - // resp.Body.Close() - // } - // os.Remove(tempname) - // if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil { - // for _, file := range matches { - // if strings.HasSuffix(file, except) { - // continue - // } - // os.Remove(file) - // } - // } - // }() - - // Create a new zip archive. - - //}(f) - - //return nil } func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *procmeta { @@ -214,7 +194,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { return err } - stdReader := func(r io.ReadCloser) { + stdReader := func(childProcName string, r io.ReadCloser) { defer func() { reco := recover() if reco != nil { @@ -248,12 +228,53 @@ func (hc *houstonClient) launch(meta *procmeta) error { } }() + readingMetric := false + var metricBuffer []byte + metricValues := make(map[string]metricValueAccessor) + for { buff, err := reader.ReadBytes('\n') if err != nil { break } + if readingMetric { + metricBuffer = append(metricBuffer, buff...) + } else if buff[0] == metric.METRIC_HEAD_INLINE { + readingMetric = true + metricBuffer = append(metricBuffer, buff[1:]...) + } + + if readingMetric { + if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { + readingMetric = false + + metricBuffer = metricBuffer[:len(metricBuffer)-2] + if metricBuffer[0] == '{' { + var metric metric.MetricDescription + json.Unmarshal(metricBuffer, &metric) + + exporter := newExporterForPrometheus() + accessor := exporter.registMetric(childProcName, metric) + prometheus.MustRegister(exporter) + + metricValues[metric.Key] = accessor + } else { + keybytes := metricBuffer[:8] + valbits := binary.BigEndian.Uint64(metricBuffer[8:]) + val := math.Float64frombits(valbits) + + if accessor, ok := metricValues[string(keybytes)]; ok { + accessor.set(val) + } + } + + metricBuffer = metricBuffer[:0] + } + + continue + } + for written := 0; written < len(buff); { n, err := targetFile.Write(buff) if err != nil { @@ -282,7 +303,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { } } - go stdReader(stdout) + go stdReader(meta.name, stdout) logger.Println("startChildProcess :", meta.cmd.Args) diff --git a/main_client.go b/main_client.go index dea1350..55f0d4d 100644 --- a/main_client.go +++ b/main_client.go @@ -6,21 +6,12 @@ import ( "context" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/houston/client" "net/http" - _ "net/http/pprof" - "os" "runtime" - - "github.com/prometheus/common/promlog" - "github.com/prometheus/common/promlog/flag" - - "github.com/alecthomas/kingpin/v2" - "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" - "github.com/prometheus/exporter-toolkit/web/kingpinflag" ) func main() { @@ -33,25 +24,13 @@ func main() { panic(err) } - var ( - toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":9100") - ) - - promlogConfig := &promlog.Config{} - flag.AddFlags(kingpin.CommandLine, promlogConfig) - kingpin.Version(version.Print("node_exporter")) - kingpin.CommandLine.UsageWriter(os.Stdout) - kingpin.HelpFlag.Short('h') - kingpin.Parse() - logger := promlog.New(promlogConfig) - - http.Handle("/metrics", client.NewHandlerForNodeExporter(true, 2, logger)) - - server := &http.Server{} - go web.ListenAndServe(server, toolkitFlags, logger) + http.Handle("/metrics", promhttp.Handler()) + server := &http.Server{Addr: ":9100", Handler: nil} + go server.ListenAndServe() hc.Start() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + server.Shutdown(ctx) cancel() }