From 2c4adddb634a9b90e2bc2ddb66ce540d7b098ab8 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Sun, 27 Jun 2021 23:14:48 +0200 Subject: [PATCH 1/4] Fix go-vet warning about mutex copying --- receiver/base.go | 28 +++++++++++++++++----------- receiver/receiver.go | 24 +++++++++++++----------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/receiver/base.go b/receiver/base.go index 595b745e..005c8676 100644 --- a/receiver/base.go +++ b/receiver/base.go @@ -28,18 +28,18 @@ type Base struct { pastDropped uint64 // atomic tooLongDropped uint64 // atomic } - droppedList [droppedListSize]string - droppedListNext int - droppedListMu sync.Mutex - parseThreads int - dropFutureSeconds uint32 - dropPastSeconds uint32 - dropTooLongLimit uint16 + droppedList [droppedListSize]string + droppedListNext int + droppedListMu sync.Mutex + parseThreads int + dropFutureSeconds uint32 + dropPastSeconds uint32 + dropTooLongLimit uint16 readTimeoutSeconds uint32 - writeChan chan *RowBinary.WriteBuffer - logger *zap.Logger - Tags tags.TagConfig - concatCharacter string + writeChan chan *RowBinary.WriteBuffer + logger *zap.Logger + Tags tags.TagConfig + concatCharacter string } func NewBase(logger *zap.Logger, config tags.TagConfig) Base { @@ -56,6 +56,12 @@ func sendInt64Gauge(send func(metric string, value float64), metric string, valu send(metric, float64(atomic.LoadInt64(value))) } +func (base *Base) applyOptions(opts ...Option) { + for _, applyOption := range opts { + applyOption(base) + } +} + func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool { if base.dropFutureSeconds != 0 && (metricTime > (nowTime + base.dropFutureSeconds)) { atomic.AddUint64(&base.stat.futureDropped, 1) diff --git a/receiver/receiver.go b/receiver/receiver.go index fcab776b..f7005c30 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -97,11 +97,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { return nil, err } - base := NewBase(zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)), config) - - for _, optApply := range opts { - optApply(&base) - } + logger := zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)) if u.Scheme == "tcp" { addr, err := net.ResolveTCPAddr("tcp", u.Host) @@ -110,9 +106,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &TCP{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan *Buffer), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -128,9 +125,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &Pickle{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan []byte), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -146,9 +144,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &UDP{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan *Buffer), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -164,8 +163,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &GRPC{ - Base: base, + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -181,8 +181,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &PrometheusRemoteWrite{ - Base: base, + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -198,8 +199,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &TelegrafHttpJson{ - Base: base, + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err From e5be3d22b50ddafc6d6eb7a20358ac7a17a21b74 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Thu, 1 Jul 2021 23:39:07 +0200 Subject: [PATCH 2/4] Fix golint warnings regarding lowercase --- carbon/app.go | 34 +++++++++++++++++----------------- carbon/config.go | 20 ++++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/carbon/app.go b/carbon/app.go index 341345d6..e40d0604 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -246,16 +246,16 @@ func (app *App) Start() (err error) { /* UPLOADER end */ /* RECEIVER start */ - if conf.Tcp.Enabled { + if conf.TCP.Enabled { app.TCP, err = receiver.New( - "tcp://"+conf.Tcp.Listen, + "tcp://"+conf.TCP.Listen, app.Config.TagDesc, receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2), receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.Tcp.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.Tcp.DropLongerThan), - receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())), + receiver.DropFuture(uint32(conf.TCP.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.TCP.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.TCP.DropLongerThan), + receiver.ReadTimeout(uint32(conf.TCP.ReadTimeout.Value().Seconds())), ) if err != nil { @@ -265,15 +265,15 @@ func (app *App) Start() (err error) { http.HandleFunc("/debug/receive/tcp/dropped/", app.TCP.DroppedHandler) } - if conf.Udp.Enabled { + if conf.UDP.Enabled { app.UDP, err = receiver.New( - "udp://"+conf.Udp.Listen, + "udp://"+conf.UDP.Listen, app.Config.TagDesc, receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2), receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.Udp.DropLongerThan), + receiver.DropFuture(uint32(conf.UDP.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.UDP.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.UDP.DropLongerThan), ) if err != nil { @@ -335,15 +335,15 @@ func (app *App) Start() (err error) { http.HandleFunc("/debug/receive/prometheus/dropped/", app.Prometheus.DroppedHandler) } - if conf.TelegrafHttpJson.Enabled { + if conf.TelegrafHTTPJSON.Enabled { app.TelegrafHttpJson, err = receiver.New( - "telegraf+http+json://"+conf.TelegrafHttpJson.Listen, + "telegraf+http+json://"+conf.TelegrafHTTPJSON.Listen, app.Config.TagDesc, receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.TelegrafHttpJson.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan), - receiver.ConcatChar(conf.TelegrafHttpJson.Concat), + receiver.DropFuture(uint32(conf.TelegrafHTTPJSON.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.TelegrafHTTPJSON.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.TelegrafHTTPJSON.DropLongerThan), + receiver.ConcatChar(conf.TelegrafHTTPJSON.Concat), ) if err != nil { diff --git a/carbon/config.go b/carbon/config.go index d56156c4..eccfd9a0 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -27,7 +27,7 @@ type commonConfig struct { } type clickhouseConfig struct { - Url string `toml:"url"` + URL string `toml:"url"` } type udpConfig struct { @@ -72,7 +72,7 @@ type promConfig struct { DropLongerThan uint16 `toml:"drop-longer-than"` } -type telegrafHttpJsonConfig struct { +type telegrafHTTPJSONConfig struct { Listen string `toml:"listen"` Enabled bool `toml:"enabled"` DropFuture *config.Duration `toml:"drop-future"` @@ -100,12 +100,12 @@ type Config struct { Common commonConfig `toml:"common"` Data dataConfig `toml:"data"` Upload map[string]*uploader.Config `toml:"upload"` - Udp udpConfig `toml:"udp"` - Tcp tcpConfig `toml:"tcp"` + UDP udpConfig `toml:"udp"` + TCP tcpConfig `toml:"tcp"` Pickle pickleConfig `toml:"pickle"` Grpc grpcConfig `toml:"grpc"` Prometheus promConfig `toml:"prometheus"` - TelegrafHttpJson telegrafHttpJsonConfig `toml:"telegraf_http_json"` + TelegrafHTTPJSON telegrafHTTPJSONConfig `toml:"telegraf_http_json"` Pprof pprofConfig `toml:"pprof"` Logging []zapwriter.Config `toml:"logging"` TagDesc tags.TagConfig `toml:"convert_to_tagged"` @@ -133,7 +133,7 @@ func NewConfig() *Config { CompAlgo: &config.Compression{CompAlgo: config.CompAlgoNone}, CompLevel: 0, }, - Udp: udpConfig{ + UDP: udpConfig{ Listen: ":2003", Enabled: true, LogIncomplete: false, @@ -141,7 +141,7 @@ func NewConfig() *Config { DropPast: &config.Duration{}, DropLongerThan: 0, }, - Tcp: tcpConfig{ + TCP: tcpConfig{ Listen: ":2003", Enabled: true, DropFuture: &config.Duration{}, @@ -172,7 +172,7 @@ func NewConfig() *Config { DropPast: &config.Duration{}, DropLongerThan: 0, }, - TelegrafHttpJson: telegrafHttpJsonConfig{ + TelegrafHTTPJSON: telegrafHTTPJSONConfig{ Listen: ":2007", Enabled: false, DropFuture: &config.Duration{}, @@ -212,7 +212,7 @@ func PrintDefaultConfig() error { } cfg.Upload = map[string]*uploader.Config{ - "graphite": &uploader.Config{ + "graphite": { Type: "points", Timeout: &config.Duration{ Duration: time.Minute, @@ -221,7 +221,7 @@ func PrintDefaultConfig() error { TableName: "graphite", URL: "http://localhost:8123/", }, - "graphite_tree": &uploader.Config{ + "graphite_tree": { Type: "tree", Timeout: &config.Duration{ Duration: time.Minute, From d0b41bef754e9f437dcfe09a693bd8c595365c44 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Thu, 1 Jul 2021 23:51:25 +0200 Subject: [PATCH 3/4] Fix more golint warnings --- carbon/app.go | 12 ++++++------ carbon/collector.go | 4 ++-- receiver/receiver.go | 2 +- receiver/telegraf_http_json.go | 18 +++++++++--------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/carbon/app.go b/carbon/app.go index e40d0604..994fe327 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -29,7 +29,7 @@ type App struct { Pickle receiver.Receiver Grpc receiver.Receiver Prometheus receiver.Receiver - TelegrafHttpJson receiver.Receiver + TelegrafHTTPJSON receiver.Receiver Collector *Collector // (!!!) Should be re-created on every change config/modules writeChan chan *RowBinary.WriteBuffer exit chan bool @@ -129,9 +129,9 @@ func (app *App) stopListeners() { logger.Debug("finished", zap.String("module", "prometheus")) } - if app.TelegrafHttpJson != nil { - app.TelegrafHttpJson.Stop() - app.TelegrafHttpJson = nil + if app.TelegrafHTTPJSON != nil { + app.TelegrafHTTPJSON.Stop() + app.TelegrafHTTPJSON = nil logger.Debug("finished", zap.String("module", "telegraf_http_json")) } } @@ -336,7 +336,7 @@ func (app *App) Start() (err error) { } if conf.TelegrafHTTPJSON.Enabled { - app.TelegrafHttpJson, err = receiver.New( + app.TelegrafHTTPJSON, err = receiver.New( "telegraf+http+json://"+conf.TelegrafHTTPJSON.Listen, app.Config.TagDesc, receiver.WriteChan(app.writeChan), @@ -350,7 +350,7 @@ func (app *App) Start() (err error) { return } - http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHttpJson.DroppedHandler) + http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHTTPJSON.DroppedHandler) } /* RECEIVER end */ diff --git a/carbon/collector.go b/carbon/collector.go index 3b637461..9a592dbc 100644 --- a/carbon/collector.go +++ b/carbon/collector.go @@ -101,8 +101,8 @@ func NewCollector(app *App) *Collector { c.stats = append(c.stats, moduleCallback("prometheus", app.Prometheus)) } - if app.TelegrafHttpJson != nil { - c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHttpJson)) + if app.TelegrafHTTPJSON != nil { + c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHTTPJSON)) } for n, u := range app.Uploaders { diff --git a/receiver/receiver.go b/receiver/receiver.go index f7005c30..2855467c 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -198,7 +198,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { return nil, err } - r := &TelegrafHttpJson{ + r := &TelegrafHTTPJSON{ Base: NewBase(logger, config), } r.applyOptions(opts...) diff --git a/receiver/telegraf_http_json.go b/receiver/telegraf_http_json.go index 803efe7f..1b5c0242 100644 --- a/receiver/telegraf_http_json.go +++ b/receiver/telegraf_http_json.go @@ -17,18 +17,18 @@ import ( "go.uber.org/zap" ) -type TelegrafHttpMetric struct { +type TelegrafHTTPMetric struct { Name string `json:"name"` Timestamp int64 `json:"timestamp"` Fields map[string]interface{} `json:"fields"` Tags map[string]string `json:"tags"` } -type TelegrafHttpPayload struct { - Metrics []TelegrafHttpMetric `json:"metrics"` +type TelegrafHTTPPayload struct { + Metrics []TelegrafHTTPMetric `json:"metrics"` } -type TelegrafHttpJson struct { +type TelegrafHTTPJSON struct { Base listener *net.TCPListener } @@ -73,14 +73,14 @@ func TelegrafEncodeTags(tags map[string]string) string { return res.String() } -func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (rcv *TelegrafHTTPJSON) ServeHTTP(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - var data TelegrafHttpPayload + var data TelegrafHTTPPayload err = json.Unmarshal(body, &data) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -144,19 +144,19 @@ metricsLoop: } // Addr returns binded socket address. For bind port 0 in tests -func (rcv *TelegrafHttpJson) Addr() net.Addr { +func (rcv *TelegrafHTTPJSON) Addr() net.Addr { if rcv.listener == nil { return nil } return rcv.listener.Addr() } -func (rcv *TelegrafHttpJson) Stat(send func(metric string, value float64)) { +func (rcv *TelegrafHTTPJSON) Stat(send func(metric string, value float64)) { rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped") } // Listen bind port. Receive messages and send to out channel -func (rcv *TelegrafHttpJson) Listen(addr *net.TCPAddr) error { +func (rcv *TelegrafHTTPJSON) Listen(addr *net.TCPAddr) error { return rcv.StartFunc(func() error { tcpListener, err := net.ListenTCP("tcp", addr) From 18a9ff4c750cbde3c940ed24a53eb4c7d282b3ee Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Wed, 29 Sep 2021 21:07:18 +0200 Subject: [PATCH 4/4] Fix linters in carbon package --- carbon/app.go | 9 +++++++-- carbon/collector.go | 2 +- carbon/config.go | 7 +++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/carbon/app.go b/carbon/app.go index 994fe327..ab9739ba 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -19,6 +19,7 @@ import ( "github.com/lomik/zapwriter" ) +// App is an application object used in main type App struct { sync.RWMutex Config *Config @@ -214,7 +215,9 @@ func (app *App) Start() (err error) { uploaders, nil, ) - app.Writer.Start() + if err := app.Writer.Start(); err != nil { + return err + } /* WRITER end */ /* UPLOADER start */ @@ -241,7 +244,9 @@ func (app *App) Start() (err error) { } for _, uploader := range app.Uploaders { - uploader.Start() + if err := uploader.Start(); err != nil { + return err + } } /* UPLOADER end */ diff --git a/carbon/collector.go b/carbon/collector.go index 9a592dbc..96b2d5e0 100644 --- a/carbon/collector.go +++ b/carbon/collector.go @@ -226,7 +226,7 @@ func (c *Collector) local(ctx context.Context) { func (c *Collector) chunked(ctx context.Context, chunkSize int, callback func([]byte)) { for { points := c.readData(ctx) - if points == nil || len(points) == 0 { + if len(points) == 0 { // exit closed return } diff --git a/carbon/config.go b/carbon/config.go index eccfd9a0..15645e32 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -15,6 +15,7 @@ import ( ) const ( + // MetricEndpointLocal used to send metrics in the carbon-clickhouse itself MetricEndpointLocal = "local" ) @@ -26,7 +27,8 @@ type commonConfig struct { Enabled bool `toml:"enabled"` } -type clickhouseConfig struct { +// ClickhouseConfig is TODO: use one ClickhouseConfig in all uploaders +type ClickhouseConfig struct { URL string `toml:"url"` } @@ -192,13 +194,14 @@ func NewConfig() *Config { return cfg } +// NewLoggingConfig returns the zapwriter.Config with logging into "/var/log/carbon-clickhouse/carbon-clickhouse.log" func NewLoggingConfig() zapwriter.Config { cfg := zapwriter.NewConfig() cfg.File = "/var/log/carbon-clickhouse/carbon-clickhouse.log" return cfg } -// PrintConfig ... +// PrintDefaultConfig ... func PrintDefaultConfig() error { cfg := NewConfig() buf := new(bytes.Buffer)