diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 0341152ab..99ae9779b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -65,6 +65,7 @@ import ( pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/cppbridge" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/relabeler/appender" // PP_CHANGES.md: rebuild on cpp + "github.com/prometheus/prometheus/pp/go/relabeler/head" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/relabeler/head/ready" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/relabeler/remotewriter" // PP_CHANGES.md: rebuild on cpp @@ -493,11 +494,6 @@ func main() { a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) - a.Flag( - "appender.copy-series-on-rotate", - "Copy active series from the current head to the new head during rotation.", - ).Hidden().Default("false").BoolVar(&appender.CopySeriesOnRotate) - promlogflag.AddFlags(a, &cfg.promlogConfig) a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error { @@ -518,6 +514,8 @@ func main() { logger := promlog.New(&cfg.promlogConfig) + readPromPPFeatures(logger) + if err := cfg.setFeatureListOptions(logger); err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err)) os.Exit(1) @@ -1925,3 +1923,47 @@ type discoveryManager interface { Run() error SyncCh() <-chan map[string][]*targetgroup.Group } + +func readPromPPFeatures(logger log.Logger) { + features := os.Getenv("PROMPP_FEATURES") + if features == "" { + return + } + + for _, feature := range strings.Split(features, ",") { + fname, fvalue, _ := strings.Cut(feature, "=") + switch strings.TrimSpace(fname) { + case "head_copy_series_on_rotate": + appender.CopySeriesOnRotate = true + level.Info(logger).Log( + "msg", + "[FEATURE] Copying active series from current head to new head during rotation is enabled.", + ) + + case "head_read_concurrency": + var ( + v = 1 + err error + ) + + if fvalue := strings.TrimSpace(fvalue); fvalue != "" { + v, err = strconv.Atoi(fvalue) + if err != nil { + level.Error(logger).Log("msg", "Error parsing head-read-concurrency value", "err", err) + continue + } + } + + head.ExtraReadConcurrency = v + level.Info(logger).Log( + "msg", + "[FEATURE] Concurrency reading is enabled.", + "extra", + v, + ) + + case "disable_coredumps": + // TODO disable-coredumps + } + } +} diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index 933cad87b..aff03b108 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -28,22 +28,6 @@ import ( ) var ( - // UnsafeCall2 - unsafeCall2Sum = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter( - prometheus.CounterOpts{ - Name: "prompp_cppbridge_unsafecall_nanoseconds_sum", - Help: "The time duration cpp call.", - ConstLabels: prometheus.Labels{"object": "unsafe", "method": "call_2"}, - }, - ) - unsafeCall2Count = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter( - prometheus.CounterOpts{ - Name: "prompp_cppbridge_unsafecall_nanoseconds_count", - Help: "The time duration cpp call.", - ConstLabels: prometheus.Labels{"object": "unsafe", "method": "call_2"}, - }, - ) - // input_relabeler input_relabeling inputRelabelerInputRelabelingSum = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter( prometheus.CounterOpts{ diff --git a/pp/go/relabeler/head/head.go b/pp/go/relabeler/head/head.go index 00a8de096..48497b937 100644 --- a/pp/go/relabeler/head/head.go +++ b/pp/go/relabeler/head/head.go @@ -20,6 +20,9 @@ import ( "github.com/prometheus/prometheus/pp/go/util" ) +// ExtraReadConcurrency number of concurrency read operation, 0 - work without concurrency. +var ExtraReadConcurrency = 0 + // RelabelerData data for relabeling - inputRelabelers per shard and state. type RelabelerData struct { state *cppbridge.State @@ -173,10 +176,12 @@ type Head struct { shards []*shard lssTaskChs []chan *relabeler.GenericTask dataStorageTaskChs []chan *relabeler.GenericTask + lssMXs []*sync.RWMutex + dataStorageMXs []*sync.RWMutex numberOfShards uint16 stopc chan struct{} - wg *sync.WaitGroup + wg sync.WaitGroup // stat registerer prometheus.Registerer @@ -208,6 +213,8 @@ func New( lssTaskChs := make([]chan *relabeler.GenericTask, numberOfShards) dataStorageTaskChs := make([]chan *relabeler.GenericTask, numberOfShards) shards := make([]*shard, numberOfShards) + lssMXs := make([]*sync.RWMutex, numberOfShards) + dataStorageMXs := make([]*sync.RWMutex, numberOfShards) for shardID := uint16(0); shardID < numberOfShards; shardID++ { lssTaskChs[shardID] = make(chan *relabeler.GenericTask, chanBufferSize) @@ -230,9 +237,11 @@ func New( shards: shards, lssTaskChs: lssTaskChs, dataStorageTaskChs: dataStorageTaskChs, + lssMXs: lssMXs, + dataStorageMXs: dataStorageMXs, stopc: make(chan struct{}), - wg: &sync.WaitGroup{}, + wg: sync.WaitGroup{}, relabelersData: make(map[string]*RelabelerData, len(inputRelabelerConfigs)), numberOfShards: numberOfShards, // stat @@ -312,6 +321,13 @@ func New( ), } + if ExtraReadConcurrency != 0 { + for shardID := uint16(0); shardID < numberOfShards; shardID++ { + h.lssMXs[shardID] = &sync.RWMutex{} + h.dataStorageMXs[shardID] = &sync.RWMutex{} + } + } + if err := h.reconfigure(inputRelabelerConfigs, numberOfShards); err != nil { return nil, err } @@ -653,21 +669,6 @@ func (h *Head) stop() { h.stopc = make(chan struct{}) } -func (h *Head) run() { - h.wg.Add(2 * int(h.numberOfShards)) - for shardID := uint16(0); shardID < h.numberOfShards; shardID++ { - go func(sid uint16) { - defer h.wg.Done() - h.lssShardLoop(sid, h.lssTaskChs[sid], h.stopc) - }(shardID) - - go func(sid uint16) { - defer h.wg.Done() - h.dataStorageShardLoop(sid, h.dataStorageTaskChs[sid], h.stopc) - }(shardID) - } -} - // CreateTask create a task for operations on the head shards. func (h *Head) CreateTask( taskName string, @@ -1002,40 +1003,55 @@ func (h *Head) updateRelabelerStateStage( return nil } -// lssShardLoop run shard loop for operation with lss. -func (h *Head) lssShardLoop( - shardID uint16, - exclusiveCH chan *relabeler.GenericTask, - stopc chan struct{}, -) { - s := h.shards[shardID] - - for { - select { - case <-stopc: - return - - case task := <-exclusiveCH: - task.ExecuteOnShard(s) +// run loop for each shard. +func (h *Head) run() { + workers := 1 + ExtraReadConcurrency + h.wg.Add(2 * workers * int(h.numberOfShards)) + for shardID := uint16(0); shardID < h.numberOfShards; shardID++ { + for i := 0; i < workers; i++ { + go func(sid uint16) { + defer h.wg.Done() + h.shardLoop(h.lssTaskChs[sid], h.stopc, h.shards[sid], h.lssMXs[sid]) + }(shardID) + + go func(sid uint16) { + defer h.wg.Done() + h.shardLoop(h.dataStorageTaskChs[sid], h.stopc, h.shards[sid], h.dataStorageMXs[sid]) + }(shardID) } } } -// dataStorageShardLoop run shard loop for operation with data storage. -func (h *Head) dataStorageShardLoop( - shardID uint16, - dataStorageCH chan *relabeler.GenericTask, +// shardLoop run shard loop for operation. +func (*Head) shardLoop( + taskCH chan *relabeler.GenericTask, stopc chan struct{}, + s *shard, + mx *sync.RWMutex, ) { - s := h.shards[shardID] - - for { - select { - case <-stopc: - return - - case task := <-dataStorageCH: - task.ExecuteOnShard(s) + if ExtraReadConcurrency == 0 { + for { + select { + case <-stopc: + return + + case task := <-taskCH: + task.ExecuteOnShard(s) + } + } + } else { + for { + select { + case <-stopc: + return + + case task := <-taskCH: + if task.IsExclusive() { + task.ExecuteOnShardWithLocker(s, mx.Lock, mx.Unlock) + } else { + task.ExecuteOnShardWithLocker(s, mx.RLock, mx.RUnlock) + } + } } } } diff --git a/pp/go/relabeler/task.go b/pp/go/relabeler/task.go index 58d56cd02..f6fe1f461 100644 --- a/pp/go/relabeler/task.go +++ b/pp/go/relabeler/task.go @@ -147,8 +147,16 @@ func NewReadOnlyGenericTask( // ExecuteOnShard execute task on shard. func (t *GenericTask) ExecuteOnShard(shard Shard) { atomic.CompareAndSwapInt64(&t.executeTS, 0, time.Now().UnixMicro()) + t.errs[shard.ShardID()] = t.shardFn(shard) + t.wg.Done() +} +// ExecuteOnShardWithLocker execute task on shard with locker. +func (t *GenericTask) ExecuteOnShardWithLocker(shard Shard, lock, unlock func()) { + lock() + atomic.CompareAndSwapInt64(&t.executeTS, 0, time.Now().UnixMicro()) t.errs[shard.ShardID()] = t.shardFn(shard) + unlock() t.wg.Done() }