custom metric 추가
This commit is contained in:
@ -17,7 +17,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
@ -64,7 +63,6 @@ func loadClientConfig() (clientConfig, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HoustonClient interface {
|
type HoustonClient interface {
|
||||||
SetReportMetrics(map[string]float32)
|
|
||||||
Shutdown()
|
Shutdown()
|
||||||
Start()
|
Start()
|
||||||
}
|
}
|
||||||
@ -525,7 +523,3 @@ func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error {
|
|||||||
hc.operationChan <- update
|
hc.operationChan <- update
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hc *houstonClient) SetReportMetrics(extra map[string]float32) {
|
|
||||||
atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra))
|
|
||||||
}
|
|
||||||
|
|||||||
83
client/custom_exporter.go
Normal file
83
client/custom_exporter.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
_ "net/http/pprof"
|
||||||
|
"sync/atomic"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/metric"
|
||||||
|
)
|
||||||
|
|
||||||
|
type metricDesc struct {
|
||||||
|
*prometheus.Desc
|
||||||
|
val *uint64
|
||||||
|
valueType prometheus.ValueType
|
||||||
|
}
|
||||||
|
|
||||||
|
type exporterForPrometheus struct {
|
||||||
|
metricPtr unsafe.Pointer // []metricDesc
|
||||||
|
}
|
||||||
|
|
||||||
|
func newExporterForPrometheus() *exporterForPrometheus {
|
||||||
|
return &exporterForPrometheus{
|
||||||
|
metricPtr: unsafe.Pointer(new([]metricDesc)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricValueAccessor struct {
|
||||||
|
ptr *uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (va metricValueAccessor) set(val float64) {
|
||||||
|
atomic.StoreUint64(va.ptr, math.Float64bits(val))
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertValueType(in metric.MetricType) prometheus.ValueType {
|
||||||
|
switch in {
|
||||||
|
case metric.MetricCounter:
|
||||||
|
return prometheus.CounterValue
|
||||||
|
|
||||||
|
case metric.MetricGuage:
|
||||||
|
return prometheus.GaugeValue
|
||||||
|
}
|
||||||
|
|
||||||
|
return prometheus.UntypedValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *exporterForPrometheus) registMetric(namespace string, desc metric.MetricDescription) metricValueAccessor {
|
||||||
|
ptr := atomic.LoadPointer(&e.metricPtr)
|
||||||
|
container := *(*[]metricDesc)(ptr)
|
||||||
|
|
||||||
|
newcont := make([]metricDesc, len(container)+1)
|
||||||
|
copy(newcont, container)
|
||||||
|
|
||||||
|
newval := new(uint64)
|
||||||
|
newcont[len(container)] = metricDesc{
|
||||||
|
Desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, "", desc.Name), desc.Help, nil, desc.ConstLabels),
|
||||||
|
val: newval,
|
||||||
|
valueType: convertValueType(desc.Type),
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.StorePointer(&e.metricPtr, unsafe.Pointer(&newcont))
|
||||||
|
return metricValueAccessor{
|
||||||
|
ptr: newval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *exporterForPrometheus) Describe(ch chan<- *prometheus.Desc) {
|
||||||
|
ptr := atomic.LoadPointer(&e.metricPtr)
|
||||||
|
container := *(*[]metricDesc)(ptr)
|
||||||
|
for _, v := range container {
|
||||||
|
ch <- v.Desc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *exporterForPrometheus) Collect(ch chan<- prometheus.Metric) {
|
||||||
|
ptr := atomic.LoadPointer(&e.metricPtr)
|
||||||
|
container := *(*[]metricDesc)(ptr)
|
||||||
|
for _, v := range container {
|
||||||
|
ch <- prometheus.MustNewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.val)))
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -242,7 +242,7 @@ func (hc *houstonClient) makeDownloadUrl(rel string) string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func copy(src, dst string) error {
|
func copyfile(src, dst string) error {
|
||||||
fi, err := os.Stat(src)
|
fi, err := os.Stat(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -311,7 +311,7 @@ func (hc *houstonClient) prepareUpdateSelf(req *shared.DeployRequest) (srcdir st
|
|||||||
selfname, _ := os.Executable()
|
selfname, _ := os.Executable()
|
||||||
srcreplacer := path.Join(path.Dir(fname), "replacer") + path.Ext(selfname)
|
srcreplacer := path.Join(path.Dir(fname), "replacer") + path.Ext(selfname)
|
||||||
replacer = "./" + filepath.ToSlash("replacer"+path.Ext(selfname))
|
replacer = "./" + filepath.ToSlash("replacer"+path.Ext(selfname))
|
||||||
err = copy(srcreplacer, replacer)
|
err = copyfile(srcreplacer, replacer)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = os.Chmod(replacer, 0775)
|
err = os.Chmod(replacer, 0775)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,10 +4,12 @@ import (
|
|||||||
"archive/zip"
|
"archive/zip"
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@ -17,7 +19,9 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||||
|
"repositories.action2quare.com/ayo/gocommon/metric"
|
||||||
"repositories.action2quare.com/ayo/houston/shared"
|
"repositories.action2quare.com/ayo/houston/shared"
|
||||||
"repositories.action2quare.com/ayo/houston/shared/protos"
|
"repositories.action2quare.com/ayo/houston/shared/protos"
|
||||||
)
|
)
|
||||||
@ -138,30 +142,6 @@ func zipLogFiles(storageRoot string, req *shared.UploadRequest, start, except st
|
|||||||
}
|
}
|
||||||
|
|
||||||
return f.Name(), matches, nil
|
return f.Name(), matches, nil
|
||||||
// defer func() {
|
|
||||||
// tempname := f.Name()
|
|
||||||
// f.Close()
|
|
||||||
|
|
||||||
// resp, _ := http.Post(req.Url, "application/zip", f)
|
|
||||||
// if resp != nil && resp.Body != nil {
|
|
||||||
// resp.Body.Close()
|
|
||||||
// }
|
|
||||||
// os.Remove(tempname)
|
|
||||||
// if del, err := strconv.ParseBool(req.DeleteAfterUploaded); del && err == nil {
|
|
||||||
// for _, file := range matches {
|
|
||||||
// if strings.HasSuffix(file, except) {
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// os.Remove(file)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// Create a new zip archive.
|
|
||||||
|
|
||||||
//}(f)
|
|
||||||
|
|
||||||
//return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *procmeta {
|
func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) *procmeta {
|
||||||
@ -214,7 +194,7 @@ func (hc *houstonClient) launch(meta *procmeta) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stdReader := func(r io.ReadCloser) {
|
stdReader := func(childProcName string, r io.ReadCloser) {
|
||||||
defer func() {
|
defer func() {
|
||||||
reco := recover()
|
reco := recover()
|
||||||
if reco != nil {
|
if reco != nil {
|
||||||
@ -248,12 +228,53 @@ func (hc *houstonClient) launch(meta *procmeta) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
readingMetric := false
|
||||||
|
var metricBuffer []byte
|
||||||
|
metricValues := make(map[string]metricValueAccessor)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
buff, err := reader.ReadBytes('\n')
|
buff, err := reader.ReadBytes('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if readingMetric {
|
||||||
|
metricBuffer = append(metricBuffer, buff...)
|
||||||
|
} else if buff[0] == metric.METRIC_HEAD_INLINE {
|
||||||
|
readingMetric = true
|
||||||
|
metricBuffer = append(metricBuffer, buff[1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if readingMetric {
|
||||||
|
if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE {
|
||||||
|
readingMetric = false
|
||||||
|
|
||||||
|
metricBuffer = metricBuffer[:len(metricBuffer)-2]
|
||||||
|
if metricBuffer[0] == '{' {
|
||||||
|
var metric metric.MetricDescription
|
||||||
|
json.Unmarshal(metricBuffer, &metric)
|
||||||
|
|
||||||
|
exporter := newExporterForPrometheus()
|
||||||
|
accessor := exporter.registMetric(childProcName, metric)
|
||||||
|
prometheus.MustRegister(exporter)
|
||||||
|
|
||||||
|
metricValues[metric.Key] = accessor
|
||||||
|
} else {
|
||||||
|
keybytes := metricBuffer[:8]
|
||||||
|
valbits := binary.BigEndian.Uint64(metricBuffer[8:])
|
||||||
|
val := math.Float64frombits(valbits)
|
||||||
|
|
||||||
|
if accessor, ok := metricValues[string(keybytes)]; ok {
|
||||||
|
accessor.set(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metricBuffer = metricBuffer[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for written := 0; written < len(buff); {
|
for written := 0; written < len(buff); {
|
||||||
n, err := targetFile.Write(buff)
|
n, err := targetFile.Write(buff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -282,7 +303,7 @@ func (hc *houstonClient) launch(meta *procmeta) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go stdReader(stdout)
|
go stdReader(meta.name, stdout)
|
||||||
|
|
||||||
logger.Println("startChildProcess :", meta.cmd.Args)
|
logger.Println("startChildProcess :", meta.cmd.Args)
|
||||||
|
|
||||||
|
|||||||
@ -6,21 +6,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||||
"repositories.action2quare.com/ayo/houston/client"
|
"repositories.action2quare.com/ayo/houston/client"
|
||||||
|
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
|
||||||
"os"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/prometheus/common/promlog"
|
|
||||||
"github.com/prometheus/common/promlog/flag"
|
|
||||||
|
|
||||||
"github.com/alecthomas/kingpin/v2"
|
|
||||||
"github.com/prometheus/common/version"
|
|
||||||
"github.com/prometheus/exporter-toolkit/web"
|
|
||||||
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -33,25 +24,13 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":9100")
|
server := &http.Server{Addr: ":9100", Handler: nil}
|
||||||
)
|
go server.ListenAndServe()
|
||||||
|
|
||||||
promlogConfig := &promlog.Config{}
|
|
||||||
flag.AddFlags(kingpin.CommandLine, promlogConfig)
|
|
||||||
kingpin.Version(version.Print("node_exporter"))
|
|
||||||
kingpin.CommandLine.UsageWriter(os.Stdout)
|
|
||||||
kingpin.HelpFlag.Short('h')
|
|
||||||
kingpin.Parse()
|
|
||||||
logger := promlog.New(promlogConfig)
|
|
||||||
|
|
||||||
http.Handle("/metrics", client.NewHandlerForNodeExporter(true, 2, logger))
|
|
||||||
|
|
||||||
server := &http.Server{}
|
|
||||||
go web.ListenAndServe(server, toolkitFlags, logger)
|
|
||||||
|
|
||||||
hc.Start()
|
hc.Start()
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
|
||||||
server.Shutdown(ctx)
|
server.Shutdown(ctx)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user