metric.Export 인터페이스 변경
This commit is contained in:
@ -29,6 +29,7 @@ type MetricDescription struct {
|
|||||||
type Exporter interface {
|
type Exporter interface {
|
||||||
RegisterMetric(*MetricDescription)
|
RegisterMetric(*MetricDescription)
|
||||||
UpdateMetric(string, float64)
|
UpdateMetric(string, float64)
|
||||||
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricWriter interface {
|
type MetricWriter interface {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package metric
|
package metric
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"math"
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
@ -49,6 +50,7 @@ type prometheusExporter struct {
|
|||||||
writerChan chan *writeRequest
|
writerChan chan *writeRequest
|
||||||
registerChan chan *prometheusMetricDesc
|
registerChan chan *prometheusMetricDesc
|
||||||
namespace string
|
namespace string
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) {
|
func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) {
|
||||||
@ -64,6 +66,12 @@ func (pe *prometheusExporter) UpdateMetric(key string, val float64) {
|
|||||||
pe.writerChan <- &writeRequest{key: key, val: val}
|
pe.writerChan <- &writeRequest{key: key, val: val}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pe *prometheusExporter) Shutdown() {
|
||||||
|
if pe.cancel != nil {
|
||||||
|
pe.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type prometheusCollector struct {
|
type prometheusCollector struct {
|
||||||
metrics map[string]*prometheusMetricDesc
|
metrics map[string]*prometheusMetricDesc
|
||||||
}
|
}
|
||||||
@ -83,7 +91,7 @@ func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pe *prometheusExporter) loop() {
|
func (pe *prometheusExporter) loop(ctx context.Context) {
|
||||||
defer func() {
|
defer func() {
|
||||||
r := recover()
|
r := recover()
|
||||||
if r != nil {
|
if r != nil {
|
||||||
@ -92,9 +100,17 @@ func (pe *prometheusExporter) loop() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var collector *prometheusCollector
|
var collector *prometheusCollector
|
||||||
|
defer func() {
|
||||||
|
if collector != nil {
|
||||||
|
prometheus.Unregister(collector)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
case req := <-pe.writerChan:
|
case req := <-pe.writerChan:
|
||||||
if m := collector.metrics[req.key]; m != nil {
|
if m := collector.metrics[req.key]; m != nil {
|
||||||
atomic.StoreUint64(m.valptr, math.Float64bits(req.val))
|
atomic.StoreUint64(m.valptr, math.Float64bits(req.val))
|
||||||
@ -128,12 +144,14 @@ func (pe *prometheusExporter) loop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewPrometheusExport(namespace string) Exporter {
|
func NewPrometheusExport(namespace string) Exporter {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
exp := &prometheusExporter{
|
exp := &prometheusExporter{
|
||||||
registerChan: make(chan *prometheusMetricDesc, 10),
|
registerChan: make(chan *prometheusMetricDesc, 10),
|
||||||
writerChan: make(chan *writeRequest, 100),
|
writerChan: make(chan *writeRequest, 100),
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
go exp.loop()
|
go exp.loop(ctx)
|
||||||
return exp
|
return exp
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user