metric 관련 코드 수정
This commit is contained in:
@ -29,7 +29,6 @@ type MetricDescription struct {
|
|||||||
type Exporter interface {
|
type Exporter interface {
|
||||||
RegisterMetric(*MetricDescription)
|
RegisterMetric(*MetricDescription)
|
||||||
UpdateMetric(string, float64)
|
UpdateMetric(string, float64)
|
||||||
Shutdown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricWriter interface {
|
type MetricWriter interface {
|
||||||
@ -118,18 +117,6 @@ func NewMetric(mt MetricType, name string, help string, constLabels map[string]s
|
|||||||
return impl
|
return impl
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadMetricValue(line []byte) (string, float64) {
|
|
||||||
if len(line) < 16 {
|
|
||||||
return "", 0
|
|
||||||
}
|
|
||||||
|
|
||||||
key := string(line[0:8])
|
|
||||||
valbits := binary.LittleEndian.Uint64(line[8:])
|
|
||||||
val := math.Float64frombits(valbits)
|
|
||||||
|
|
||||||
return key, val
|
|
||||||
}
|
|
||||||
|
|
||||||
var metricEnabled = false
|
var metricEnabled = false
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
package metric
|
package metric
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
@ -34,11 +34,6 @@ func convertValueType(in MetricType) prometheus.ValueType {
|
|||||||
return prometheus.UntypedValue
|
return prometheus.UntypedValue
|
||||||
}
|
}
|
||||||
|
|
||||||
type writeRequest struct {
|
|
||||||
key string
|
|
||||||
val float64
|
|
||||||
}
|
|
||||||
|
|
||||||
type prometheusMetricDesc struct {
|
type prometheusMetricDesc struct {
|
||||||
*prometheus.Desc
|
*prometheus.Desc
|
||||||
valueType prometheus.ValueType
|
valueType prometheus.ValueType
|
||||||
@ -46,116 +41,69 @@ type prometheusMetricDesc struct {
|
|||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
type prometheusExporter struct {
|
type PrometheusCollector struct {
|
||||||
writerChan chan *writeRequest
|
|
||||||
registerChan chan *prometheusMetricDesc
|
|
||||||
namespace string
|
namespace string
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) {
|
|
||||||
pe.registerChan <- &prometheusMetricDesc{
|
|
||||||
Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels),
|
|
||||||
valueType: convertValueType(nm.Type),
|
|
||||||
valptr: new(uint64),
|
|
||||||
key: nm.Key,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pe *prometheusExporter) UpdateMetric(key string, val float64) {
|
|
||||||
pe.writerChan <- &writeRequest{key: key, val: val}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pe *prometheusExporter) Shutdown() {
|
|
||||||
if pe.cancel != nil {
|
|
||||||
pe.cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type prometheusCollector struct {
|
|
||||||
metrics map[string]*prometheusMetricDesc
|
metrics map[string]*prometheusMetricDesc
|
||||||
|
registry *prometheus.Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) {
|
func (pc *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||||
for _, v := range pc.metrics {
|
for _, v := range pc.metrics {
|
||||||
|
logger.Println("collector describe :", v.Desc.String())
|
||||||
ch <- v.Desc
|
ch <- v.Desc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
|
func (pc *PrometheusCollector) Collect(ch chan<- prometheus.Metric) {
|
||||||
for _, v := range pc.metrics {
|
for _, v := range pc.metrics {
|
||||||
cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr)))
|
logger.Println("collector collect :", v.Desc.String())
|
||||||
|
value := atomic.LoadUint64(v.valptr)
|
||||||
|
cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(value))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ch <- cm
|
ch <- cm
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pe *prometheusExporter) loop(ctx context.Context) {
|
func (pc *PrometheusCollector) RegisterMetric(md *MetricDescription) *PrometheusCollector {
|
||||||
defer func() {
|
nm := &prometheusMetricDesc{
|
||||||
r := recover()
|
Desc: prometheus.NewDesc(prometheus.BuildFQName("ou", "", md.Name), md.Help, nil, md.ConstLabels),
|
||||||
if r != nil {
|
valueType: convertValueType(md.Type),
|
||||||
logger.Error(r)
|
valptr: new(uint64),
|
||||||
}
|
key: md.Key,
|
||||||
}()
|
|
||||||
|
|
||||||
var collector *prometheusCollector
|
|
||||||
defer func() {
|
|
||||||
if collector != nil {
|
|
||||||
prometheus.Unregister(collector)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case req := <-pe.writerChan:
|
|
||||||
if collector != nil {
|
|
||||||
if m := collector.metrics[req.key]; m != nil {
|
|
||||||
atomic.StoreUint64(m.valptr, math.Float64bits(req.val))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case nm := <-pe.registerChan:
|
next := NewPrometheusCollector(pc.namespace, pc.registry)
|
||||||
var nextmetrics map[string]*prometheusMetricDesc
|
maps.Copy(next.metrics, pc.metrics)
|
||||||
if collector != nil {
|
next.metrics[nm.key] = nm
|
||||||
nextmetrics = collector.metrics
|
|
||||||
prometheus.Unregister(collector)
|
|
||||||
nextmetrics[nm.key] = nm
|
|
||||||
} else {
|
|
||||||
nextmetrics = map[string]*prometheusMetricDesc{
|
|
||||||
nm.key: nm,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nextcollector := &prometheusCollector{
|
pc.registry.Unregister(pc)
|
||||||
metrics: nextmetrics,
|
pc.registry.Register(next)
|
||||||
}
|
|
||||||
|
|
||||||
if err := prometheus.Register(nextcollector); err != nil {
|
return next
|
||||||
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
|
}
|
||||||
// 이미 등록된 metric. child process를 여럿 실행하면 발생됨
|
|
||||||
} else {
|
func (pc *PrometheusCollector) UpdateMetric(key string, val float64) {
|
||||||
logger.Error("prometheus register err :", *nm, err)
|
if m := pc.metrics[key]; m != nil {
|
||||||
}
|
atomic.StoreUint64(m.valptr, math.Float64bits(val))
|
||||||
} else {
|
|
||||||
collector = nextcollector
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPrometheusExport(namespace string) Exporter {
|
func (pc *PrometheusCollector) UnregisterMetric(key string) *PrometheusCollector {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
next := NewPrometheusCollector(pc.namespace, pc.registry)
|
||||||
exp := &prometheusExporter{
|
maps.Copy(next.metrics, pc.metrics)
|
||||||
registerChan: make(chan *prometheusMetricDesc, 10),
|
delete(next.metrics, key)
|
||||||
writerChan: make(chan *writeRequest, 100),
|
|
||||||
|
pc.registry.Unregister(pc)
|
||||||
|
pc.registry.Register(next)
|
||||||
|
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPrometheusCollector(namespace string, registry *prometheus.Registry) *PrometheusCollector {
|
||||||
|
return &PrometheusCollector{
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
cancel: cancel,
|
metrics: make(map[string]*prometheusMetricDesc),
|
||||||
|
registry: registry,
|
||||||
}
|
}
|
||||||
|
|
||||||
go exp.loop(ctx)
|
|
||||||
return exp
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user