Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7356200
rebuild
u-veles-a May 21, 2025
7991cce
tuning
u-veles-a May 21, 2025
69959a0
tuning
u-veles-a May 21, 2025
bbe1c21
tuning
u-veles-a May 22, 2025
19076b7
tuning
u-veles-a May 22, 2025
c56b213
rebuild
u-veles-a May 23, 2025
0d6e6ed
chunk Read
u-veles-a May 23, 2025
4500bac
clearing
u-veles-a May 23, 2025
df9789c
Merge branch 'pp' into append_rlock
u-veles-a May 23, 2025
e698043
add on lss snapshot
u-veles-a May 23, 2025
47b2c54
Merge branch 'one_lss_snapshot' into rebuild_generic_tasks
u-veles-a May 23, 2025
6da4dc3
fix merge
u-veles-a May 23, 2025
55d425a
for save
u-veles-a May 26, 2025
1208fe7
clearing
u-veles-a May 26, 2025
05d3e79
Merge branch 'append_rlock' into rebuild_generic_tasks
u-veles-a May 26, 2025
2f5dc02
for save
u-veles-a May 26, 2025
e49ea6a
for save
u-veles-a May 26, 2025
81b2faf
for save
u-veles-a May 26, 2025
a13ffbb
for save
u-veles-a May 27, 2025
3b4f2e1
clearing
u-veles-a May 28, 2025
e49ecec
Merge branch 'pp' into rebuild_generic_tasks
u-veles-a May 28, 2025
daf7edc
for save
u-veles-a May 28, 2025
d6ffa0c
task optimization
u-veles-a May 29, 2025
753eb12
rebuild
u-veles-a May 30, 2025
43121c3
for save
u-veles-a May 30, 2025
99e21b6
split lss ds
u-veles-a Jun 2, 2025
e2d43fa
rename
u-veles-a Jun 2, 2025
aef0f2c
clearing
u-veles-a Jun 2, 2025
1b3a61f
fix snapshots
u-veles-a Jun 2, 2025
9f92af3
for save
u-veles-a Jun 2, 2025
d687145
rebuild to lssds
u-veles-a Jun 2, 2025
8d9c715
fix
u-veles-a Jun 2, 2025
3e32c11
some fix
u-veles-a Jun 2, 2025
0d1e2f8
fix test
u-veles-a Jun 2, 2025
1083a8b
rebuild
u-veles-a Jun 3, 2025
65264ff
clearing
u-veles-a Jun 3, 2025
cf5c33b
fix revew
u-veles-a Jun 3, 2025
39a1126
Merge branch 'pp' into rebuild_generic_tasks
u-veles-a Jun 3, 2025
ee313be
Merge branch 'rebuild_generic_tasks' into rebuild_forexhshard
u-veles-a Jun 3, 2025
806b059
rebuild task
u-veles-a Jun 3, 2025
00f5d28
add flags and read conc
u-veles-a Jun 4, 2025
fca79f5
fix
u-veles-a Jun 4, 2025
bf4baa1
clearing
u-veles-a Jun 4, 2025
9fca915
clearing
u-veles-a Jun 4, 2025
6a4d43e
fix snake_case
u-veles-a Jun 5, 2025
9d62e3f
Merge branch 'pp' into feature_flags
vporoshok Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/cppbridge" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/appender" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/ready" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/remotewriter" // PP_CHANGES.md: rebuild on cpp
Expand Down Expand Up @@ -493,11 +494,6 @@ func main() {
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)

a.Flag(
"appender.copy-series-on-rotate",
"Copy active series from the current head to the new head during rotation.",
).Hidden().Default("false").BoolVar(&appender.CopySeriesOnRotate)

promlogflag.AddFlags(a, &cfg.promlogConfig)

a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
Expand All @@ -518,6 +514,8 @@ func main() {

logger := promlog.New(&cfg.promlogConfig)

readPromPPFeatures(logger)

if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
os.Exit(1)
Expand Down Expand Up @@ -1925,3 +1923,47 @@ type discoveryManager interface {
Run() error
SyncCh() <-chan map[string][]*targetgroup.Group
}

func readPromPPFeatures(logger log.Logger) {
features := os.Getenv("PROMPP_FEATURES")
if features == "" {
return
}

for _, feature := range strings.Split(features, ",") {
fname, fvalue, _ := strings.Cut(feature, "=")
switch strings.TrimSpace(fname) {
case "head_copy_series_on_rotate":
appender.CopySeriesOnRotate = true
level.Info(logger).Log(
"msg",
"[FEATURE] Copying active series from current head to new head during rotation is enabled.",
)

case "head_read_concurrency":
var (
v = 1
err error
)

if fvalue := strings.TrimSpace(fvalue); fvalue != "" {
v, err = strconv.Atoi(fvalue)
if err != nil {
level.Error(logger).Log("msg", "Error parsing head-read-concurrency value", "err", err)
continue
}
}

head.ExtraReadConcurrency = v
level.Info(logger).Log(
"msg",
"[FEATURE] Concurrency reading is enabled.",
"extra",
v,
)

case "disable_coredumps":
// TODO disable-coredumps
}
}
}
16 changes: 0 additions & 16 deletions pp/go/cppbridge/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,6 @@ import (
)

var (
// UnsafeCall2
unsafeCall2Sum = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter(
prometheus.CounterOpts{
Name: "prompp_cppbridge_unsafecall_nanoseconds_sum",
Help: "The time duration cpp call.",
ConstLabels: prometheus.Labels{"object": "unsafe", "method": "call_2"},
},
)
unsafeCall2Count = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter(
prometheus.CounterOpts{
Name: "prompp_cppbridge_unsafecall_nanoseconds_count",
Help: "The time duration cpp call.",
ConstLabels: prometheus.Labels{"object": "unsafe", "method": "call_2"},
},
)

// input_relabeler input_relabeling
inputRelabelerInputRelabelingSum = util.NewUnconflictRegisterer(prometheus.DefaultRegisterer).NewCounter(
prometheus.CounterOpts{
Expand Down
106 changes: 61 additions & 45 deletions pp/go/relabeler/head/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/prometheus/prometheus/pp/go/util"
)

// ExtraReadConcurrency number of concurrency read operation, 0 - work without concurrency.
var ExtraReadConcurrency = 0

// RelabelerData data for relabeling - inputRelabelers per shard and state.
type RelabelerData struct {
state *cppbridge.State
Expand Down Expand Up @@ -173,10 +176,12 @@ type Head struct {
shards []*shard
lssTaskChs []chan *relabeler.GenericTask
dataStorageTaskChs []chan *relabeler.GenericTask
lssMXs []*sync.RWMutex
dataStorageMXs []*sync.RWMutex

numberOfShards uint16
stopc chan struct{}
wg *sync.WaitGroup
wg sync.WaitGroup

// stat
registerer prometheus.Registerer
Expand Down Expand Up @@ -208,6 +213,8 @@ func New(
lssTaskChs := make([]chan *relabeler.GenericTask, numberOfShards)
dataStorageTaskChs := make([]chan *relabeler.GenericTask, numberOfShards)
shards := make([]*shard, numberOfShards)
lssMXs := make([]*sync.RWMutex, numberOfShards)
dataStorageMXs := make([]*sync.RWMutex, numberOfShards)

for shardID := uint16(0); shardID < numberOfShards; shardID++ {
lssTaskChs[shardID] = make(chan *relabeler.GenericTask, chanBufferSize)
Expand All @@ -230,9 +237,11 @@ func New(
shards: shards,
lssTaskChs: lssTaskChs,
dataStorageTaskChs: dataStorageTaskChs,
lssMXs: lssMXs,
dataStorageMXs: dataStorageMXs,

stopc: make(chan struct{}),
wg: &sync.WaitGroup{},
wg: sync.WaitGroup{},
relabelersData: make(map[string]*RelabelerData, len(inputRelabelerConfigs)),
numberOfShards: numberOfShards,
// stat
Expand Down Expand Up @@ -312,6 +321,13 @@ func New(
),
}

if ExtraReadConcurrency != 0 {
for shardID := uint16(0); shardID < numberOfShards; shardID++ {
h.lssMXs[shardID] = &sync.RWMutex{}
h.dataStorageMXs[shardID] = &sync.RWMutex{}
}
}

if err := h.reconfigure(inputRelabelerConfigs, numberOfShards); err != nil {
return nil, err
}
Expand Down Expand Up @@ -653,21 +669,6 @@ func (h *Head) stop() {
h.stopc = make(chan struct{})
}

func (h *Head) run() {
h.wg.Add(2 * int(h.numberOfShards))
for shardID := uint16(0); shardID < h.numberOfShards; shardID++ {
go func(sid uint16) {
defer h.wg.Done()
h.lssShardLoop(sid, h.lssTaskChs[sid], h.stopc)
}(shardID)

go func(sid uint16) {
defer h.wg.Done()
h.dataStorageShardLoop(sid, h.dataStorageTaskChs[sid], h.stopc)
}(shardID)
}
}

// CreateTask create a task for operations on the head shards.
func (h *Head) CreateTask(
taskName string,
Expand Down Expand Up @@ -1002,40 +1003,55 @@ func (h *Head) updateRelabelerStateStage(
return nil
}

// lssShardLoop run shard loop for operation with lss.
func (h *Head) lssShardLoop(
shardID uint16,
exclusiveCH chan *relabeler.GenericTask,
stopc chan struct{},
) {
s := h.shards[shardID]

for {
select {
case <-stopc:
return

case task := <-exclusiveCH:
task.ExecuteOnShard(s)
// run loop for each shard.
func (h *Head) run() {
workers := 1 + ExtraReadConcurrency
h.wg.Add(2 * workers * int(h.numberOfShards))
for shardID := uint16(0); shardID < h.numberOfShards; shardID++ {
for i := 0; i < workers; i++ {
go func(sid uint16) {
defer h.wg.Done()
h.shardLoop(h.lssTaskChs[sid], h.stopc, h.shards[sid], h.lssMXs[sid])
}(shardID)

go func(sid uint16) {
defer h.wg.Done()
h.shardLoop(h.dataStorageTaskChs[sid], h.stopc, h.shards[sid], h.dataStorageMXs[sid])
}(shardID)
}
}
}

// dataStorageShardLoop run shard loop for operation with data storage.
func (h *Head) dataStorageShardLoop(
shardID uint16,
dataStorageCH chan *relabeler.GenericTask,
// shardLoop run shard loop for operation.
func (*Head) shardLoop(
taskCH chan *relabeler.GenericTask,
stopc chan struct{},
s *shard,
mx *sync.RWMutex,
) {
s := h.shards[shardID]

for {
select {
case <-stopc:
return

case task := <-dataStorageCH:
task.ExecuteOnShard(s)
if ExtraReadConcurrency == 0 {
for {
select {
case <-stopc:
return

case task := <-taskCH:
task.ExecuteOnShard(s)
}
}
} else {
for {
select {
case <-stopc:
return

case task := <-taskCH:
if task.IsExclusive() {
task.ExecuteOnShardWithLocker(s, mx.Lock, mx.Unlock)
} else {
task.ExecuteOnShardWithLocker(s, mx.RLock, mx.RUnlock)
}
}
}
}
}
8 changes: 8 additions & 0 deletions pp/go/relabeler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,16 @@ func NewReadOnlyGenericTask(
// ExecuteOnShard execute task on shard.
func (t *GenericTask) ExecuteOnShard(shard Shard) {
atomic.CompareAndSwapInt64(&t.executeTS, 0, time.Now().UnixMicro())
t.errs[shard.ShardID()] = t.shardFn(shard)
t.wg.Done()
}

// ExecuteOnShardWithLocker execute task on shard with locker.
func (t *GenericTask) ExecuteOnShardWithLocker(shard Shard, lock, unlock func()) {
lock()
atomic.CompareAndSwapInt64(&t.executeTS, 0, time.Now().UnixMicro())
t.errs[shard.ShardID()] = t.shardFn(shard)
unlock()
t.wg.Done()
}

Expand Down
Loading