package client import ( "bufio" "context" "encoding/json" "io" "maps" "strconv" "strings" "github.com/prometheus/client_golang/prometheus" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/metric" ) type pipeListener struct { config clientConfig registry *prometheus.Registry } func run_metric_pipe_reader(config clientConfig, registry *prometheus.Registry, ctx context.Context) { r := &pipeListener{ config: config, registry: registry, } go r.listen(ctx) } type pipeReader struct { handle io.ReadCloser constLabels map[string]string collector *metric.PrometheusCollector } func (l *pipeListener) startReader(r io.ReadCloser) { reader := pipeReader{ handle: r, constLabels: l.config.ConstLabels, collector: metric.NewPrometheusCollector(l.config.MetricNamespace, l.registry), } defer func() { reader.close(l.registry) }() scanner := bufio.NewScanner(r) for scanner.Scan() { reader.parseLine(scanner.Text()) } } func (r *pipeReader) close(registry *prometheus.Registry) { registry.Unregister(r.collector) r.handle.Close() } func (r *pipeReader) parseLine(line string) { defer func() { r := recover() if r != nil { logger.Println(r) } }() switch line[0] { case '{': var desc metric.MetricDescription if err := json.Unmarshal([]byte(line), &desc); err != nil { logger.Println("unmarshal metric failed :", err, line) return } if desc.ConstLabels == nil { desc.ConstLabels = make(map[string]string) } maps.Copy(desc.ConstLabels, r.constLabels) r.collector = r.collector.RegisterMetric(&desc) default: kv := strings.Split(line, ":") if len(kv) != 2 { return } if len(kv[1]) == 0 { r.collector = r.collector.UnregisterMetric(kv[0]) } else { if val, err := strconv.ParseFloat(kv[1], 64); err == nil { r.collector.UpdateMetric(kv[0], val) } } } }