Skip to content
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7356200
rebuild
u-veles-a May 21, 2025
7991cce
tuning
u-veles-a May 21, 2025
69959a0
tuning
u-veles-a May 21, 2025
bbe1c21
tuning
u-veles-a May 22, 2025
19076b7
tuning
u-veles-a May 22, 2025
c56b213
rebuild
u-veles-a May 23, 2025
0d6e6ed
chunk Read
u-veles-a May 23, 2025
4500bac
clearing
u-veles-a May 23, 2025
df9789c
Merge branch 'pp' into append_rlock
u-veles-a May 23, 2025
e698043
add on lss snapshot
u-veles-a May 23, 2025
47b2c54
Merge branch 'one_lss_snapshot' into rebuild_generic_tasks
u-veles-a May 23, 2025
6da4dc3
fix merge
u-veles-a May 23, 2025
55d425a
for save
u-veles-a May 26, 2025
1208fe7
clearing
u-veles-a May 26, 2025
05d3e79
Merge branch 'append_rlock' into rebuild_generic_tasks
u-veles-a May 26, 2025
2f5dc02
for save
u-veles-a May 26, 2025
e49ea6a
for save
u-veles-a May 26, 2025
81b2faf
for save
u-veles-a May 26, 2025
a13ffbb
for save
u-veles-a May 27, 2025
3b4f2e1
clearing
u-veles-a May 28, 2025
e49ecec
Merge branch 'pp' into rebuild_generic_tasks
u-veles-a May 28, 2025
daf7edc
for save
u-veles-a May 28, 2025
d6ffa0c
task optimization
u-veles-a May 29, 2025
753eb12
rebuild
u-veles-a May 30, 2025
43121c3
for save
u-veles-a May 30, 2025
99e21b6
split lss ds
u-veles-a Jun 2, 2025
e2d43fa
rename
u-veles-a Jun 2, 2025
aef0f2c
clearing
u-veles-a Jun 2, 2025
1b3a61f
fix snapshots
u-veles-a Jun 2, 2025
9f92af3
for save
u-veles-a Jun 2, 2025
d687145
rebuild to lssds
u-veles-a Jun 2, 2025
8d9c715
fix
u-veles-a Jun 2, 2025
3e32c11
some fix
u-veles-a Jun 2, 2025
0d1e2f8
fix test
u-veles-a Jun 2, 2025
1083a8b
rebuild
u-veles-a Jun 3, 2025
65264ff
clearing
u-veles-a Jun 3, 2025
cf5c33b
fix revew
u-veles-a Jun 3, 2025
39a1126
Merge branch 'pp' into rebuild_generic_tasks
u-veles-a Jun 3, 2025
ee313be
Merge branch 'rebuild_generic_tasks' into rebuild_forexhshard
u-veles-a Jun 3, 2025
806b059
rebuild task
u-veles-a Jun 3, 2025
00f5d28
add flags and read conc
u-veles-a Jun 4, 2025
fca79f5
fix
u-veles-a Jun 4, 2025
bf4baa1
clearing
u-veles-a Jun 4, 2025
9fca915
clearing
u-veles-a Jun 4, 2025
6a4d43e
fix snake_case
u-veles-a Jun 5, 2025
9d62e3f
Merge branch 'pp' into feature_flags
vporoshok Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/cppbridge" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/appender" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/ready" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/remotewriter" // PP_CHANGES.md: rebuild on cpp
Expand Down Expand Up @@ -493,11 +494,6 @@ func main() {
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)

a.Flag(
"appender.copy-series-on-rotate",
"Copy active series from the current head to the new head during rotation.",
).Hidden().Default("false").BoolVar(&appender.CopySeriesOnRotate)

promlogflag.AddFlags(a, &cfg.promlogConfig)

a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
Expand All @@ -518,6 +514,8 @@ func main() {

logger := promlog.New(&cfg.promlogConfig)

readPromPPFeatures(logger)

if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
os.Exit(1)
Expand Down Expand Up @@ -1925,3 +1923,47 @@ type discoveryManager interface {
Run() error
SyncCh() <-chan map[string][]*targetgroup.Group
}

func readPromPPFeatures(logger log.Logger) {
features := os.Getenv("PROMPP_FEATURES")
if features == "" {
return
}

for _, feature := range strings.Split(features, ",") {
fname, fvalue, _ := strings.Cut(feature, "=")
switch strings.TrimSpace(fname) {
case "head-copy-series-on-rotate":
appender.CopySeriesOnRotate = true
level.Info(logger).Log(
"msg",
"[FEATURE] Copying active series from current head to new head during rotation is enabled.",
)

case "head-read-concurrency":
var (
v = 1
err error
)

if fvalue := strings.TrimSpace(fvalue); fvalue != "" {
v, err = strconv.Atoi(fvalue)
if err != nil {
level.Error(logger).Log("msg", "Error parsing head-read-concurrency value", "err", err)
continue
}
}

head.ExtraReadConcurrency = v
level.Info(logger).Log(
"msg",
"[FEATURE] Concurrency reading is enabled.",
"extra",
v,
)

case "disable-coredumps":
// TODO disable-coredumps
}
}
}
14 changes: 11 additions & 3 deletions cmd/prompptool/walpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,17 @@ func (cmd *cmdWALPPToBlock) Do(
h.Stop()

level.Debug(logger).Log("msg", "write block", "id", headRecord.ID(), "dir", headRecord.Dir())
if err = h.ForEachShard(func(shard relabeler.Shard) error {
return bw.Write(relabeler.NewBlock(shard.LSS().Raw(), shard.DataStorage().Raw()))
}); err != nil {

tBlockWrite := h.CreateTask(
relabeler.BlockWrite,
func(shard relabeler.Shard) error {
return bw.Write(relabeler.NewBlock(shard.LSS().Raw(), shard.DataStorage().Raw()))
},
relabeler.ForLSSTask,
relabeler.ExclusiveTask,
)
h.Enqueue(tBlockWrite)
if err = tBlockWrite.Wait(); err != nil {
return fmt.Errorf("failed to write tsdb block [id: %s, dir: %s]: %w", headRecord.ID(), headRecord.Dir(), err)
}

Expand Down
24 changes: 24 additions & 0 deletions pp/entrypoint/head_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,30 @@ extern "C" void prompp_get_head_status(void* args, void* res) {
head::StatusGetter<entrypoint::head::QueryableEncodingBimap, Status>{lss, *in->data_storage, in->limit}.get(*static_cast<Status*>(res));
}

extern "C" void prompp_get_head_status_lss(void* args, void* res) {
struct Arguments {
LssVariantPtr lss;
size_t limit;
};

const auto in = static_cast<const Arguments*>(args);
const auto& lss = std::get<entrypoint::head::QueryableEncodingBimap>(*in->lss);

head::StatusGetterLSS<entrypoint::head::QueryableEncodingBimap, Status>{lss, in->limit}.get(*static_cast<Status*>(res));
}

extern "C" void prompp_get_head_status_data_storage(void* args, void* res) {
struct Arguments {
DataStoragePtr data_storage;
};

const auto in = static_cast<const Arguments*>(args);
Status* status = static_cast<Status*>(res);

status->min_max_timestamp = series_data::Decoder::get_time_interval(*in->data_storage);
status->chunk_count = in->data_storage->chunks().non_empty_chunk_count();
}

extern "C" void prompp_free_head_status(void* args) {
static_cast<Status*>(args)->~Status();
}
56 changes: 56 additions & 0 deletions pp/entrypoint/head_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,62 @@ void prompp_get_head_status(void* args, void* res);
*/
void prompp_free_head_status(void* args);

/**
* @brief Return head status from lss.
*
* @param args {
* lss uintptr // pointer to constructed lss
* limit int // statistics limit
* }
*
* @param res {
* status struct { // head status
* label_value_count_by_label_name []struct {
* name string
* count uint32
* }
* series_count_by_metric_name []struct {
* name string
* count uint32
* }
* memory_in_bytes_by_label_name []struct {
* name string
* size uint32
* }
* series_count_by_label_value_pair [] struct {
* name string
* value string
* count uint32
* }
* num_series uint32
* num_label_pairs uint32
* rule_queried_series uint32
* federate_queried_series uint32
* other_queried_series uint32
* }
* }
*/
void prompp_get_head_status_lss(void* args, void* res);

/**
* @brief Return head status from lss.
*
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* }
*
* @param res {
* status struct { // head status
* time_interval struct {
* min int64
* max int64
* }
* chunk_count uint32
* }
* }
*/
void prompp_get_head_status_data_storage(void* args, void* res);

#ifdef __cplusplus
} // extern "C"
#endif
70 changes: 48 additions & 22 deletions pp/entrypoint/prometheus_relabeler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,23 @@ extern "C" void prompp_prometheus_relabeled_series_dtor(void* args) {
//

extern "C" void prompp_prometheus_relabeler_state_update_ctor(void* args) {
using PromPP::Prometheus::Relabel::RelabelerStateUpdate;
struct Arguments {
PromPP::Prometheus::Relabel::RelabelerStateUpdate* relabeler_state_update;
RelabelerStateUpdate* relabeler_state_update;
};

auto* in = static_cast<Arguments*>(args);

new (in->relabeler_state_update) PromPP::Prometheus::Relabel::RelabelerStateUpdate();
new (in->relabeler_state_update) RelabelerStateUpdate();
}

extern "C" void prompp_prometheus_relabeler_state_update_dtor(void* args) {
using PromPP::Prometheus::Relabel::RelabelerStateUpdate;
struct Arguments {
PromPP::Prometheus::Relabel::RelabelerStateUpdate* relabeler_state_update;
RelabelerStateUpdate* relabeler_state_update;
};

static_cast<Arguments*>(args)->relabeler_state_update->~vector();
static_cast<Arguments*>(args)->relabeler_state_update->~RelabelerStateUpdate();
}

//
Expand Down Expand Up @@ -169,18 +171,6 @@ extern "C" void prompp_prometheus_per_shard_relabeler_dtor(void* args) {
static_cast<Arguments*>(args)->~Arguments();
}

extern "C" void prompp_prometheus_per_shard_relabeler_cache_allocated_memory(void* args, void* res) {
struct Arguments {
PerShardRelabelerPtr per_shard_relabeler;
};
struct Result {
size_t allocated_memory;
};

const auto* in = static_cast<Arguments*>(args);
new (res) Result{.allocated_memory = in->per_shard_relabeler->cache_allocated_memory()};
}

extern "C" void prompp_prometheus_per_shard_relabeler_input_relabeling(void* args, void* res) {
struct Arguments {
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::InnerSeries*> shards_inner_series;
Expand Down Expand Up @@ -319,9 +309,9 @@ extern "C" void prompp_prometheus_per_shard_relabeler_input_collect_stalenans(vo

extern "C" void prompp_prometheus_per_shard_relabeler_append_relabeler_series(void* args, void* res) {
struct Arguments {
PromPP::Prometheus::Relabel::InnerSeries* inner_series;
PromPP::Prometheus::Relabel::RelabeledSeries* relabeled_series;
PromPP::Prometheus::Relabel::RelabelerStateUpdate* relabeler_state_update;
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::InnerSeries*> shards_inner_series;
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::RelabeledSeries*> shards_relabeled_series;
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::RelabelerStateUpdate*> shards_relabeler_state_update;
PerShardRelabelerPtr per_shard_relabeler;
LssVariantPtr lss;
};
Expand All @@ -335,9 +325,17 @@ extern "C" void prompp_prometheus_per_shard_relabeler_append_relabeler_series(vo

try {
auto& lss = std::get<entrypoint::head::QueryableEncodingBimap>(*in->lss);

entrypoint::head::lss_memory::has_reallocations = false;
in->per_shard_relabeler->append_relabeler_series(lss, in->inner_series, in->relabeled_series, in->relabeler_state_update);

for (size_t id = 0; id != in->shards_relabeled_series.size(); ++id) {
if (in->shards_relabeled_series[id] == nullptr || in->shards_relabeled_series[id]->size() == 0) {
continue;
}

in->per_shard_relabeler->append_relabeler_series(lss, in->shards_inner_series[id], in->shards_relabeled_series[id],
in->shards_relabeler_state_update[id]);
}

std::vector<uint32_t> ids;
lss.sort_series_ids(ids);
out->target_lss_has_reallocations = entrypoint::head::lss_memory::has_reallocations;
Expand All @@ -347,7 +345,7 @@ extern "C" void prompp_prometheus_per_shard_relabeler_append_relabeler_series(vo
}
}

extern "C" void prompp_prometheus_per_shard_relabeler_update_relabeler_state(void* args, void* res) {
extern "C" void prompp_prometheus_per_shard_singe_relabeler_update_relabeler_state(void* args, void* res) {
struct Arguments {
PromPP::Prometheus::Relabel::RelabelerStateUpdate* relabeler_state_update;
PerShardRelabelerPtr per_shard_relabeler;
Expand All @@ -369,6 +367,34 @@ extern "C" void prompp_prometheus_per_shard_relabeler_update_relabeler_state(voi
}
}

extern "C" void prompp_prometheus_per_shard_relabeler_update_relabeler_state(void* args, void* res) {
struct Arguments {
PromPP::Primitives::Go::SliceView<PromPP::Prometheus::Relabel::RelabelerStateUpdate*> shards_relabeler_state_update;
PerShardRelabelerPtr per_shard_relabeler;
CachePtr cache;
uint16_t relabeled_shard_id;
};
struct Result {
PromPP::Primitives::Go::Slice<char> error;
};

const auto* in = static_cast<Arguments*>(args);

try {
for (size_t id = 0; id != in->shards_relabeler_state_update.size(); ++id) {
if (in->shards_relabeler_state_update[id] == nullptr || in->shards_relabeler_state_update[id]->size() == 0) {
continue;
}

in->per_shard_relabeler->update_relabeler_state(*in->cache, in->shards_relabeler_state_update[id], id);
}
} catch (...) {
auto* out = new (res) Result();
auto err_stream = PromPP::Primitives::Go::BytesStream(&out->error);
entrypoint::handle_current_exception(err_stream);
}
}

extern "C" void prompp_prometheus_per_shard_relabeler_output_relabeling(void* args, void* res) {
struct Arguments {
PromPP::Prometheus::Relabel::RelabeledSeries* relabeled_series;
Expand Down
43 changes: 23 additions & 20 deletions pp/entrypoint/prometheus_relabeler.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,6 @@ void prompp_prometheus_per_shard_relabeler_ctor(void* args, void* res);
*/
void prompp_prometheus_per_shard_relabeler_dtor(void* args);

/**
* @brief return size of allocated memory for cache map.
*
* @param args {
* per_shard_relabeler uintptr // pointer to constructed per shard relabeler;
* }
*
* @param res {
* allocated_memory uint64 // size of allocated memory for label sets;
* }
*/
void prompp_prometheus_per_shard_relabeler_cache_allocated_memory(void* args, void* res);

/**
* @brief relabeling incomig hashdex(first stage).
*
Expand Down Expand Up @@ -253,16 +240,16 @@ void prompp_prometheus_per_shard_relabeler_input_collect_stalenans(void* args, v
* @brief add relabeled ls to lss, add to result and add to cache update(second stage).
*
* @param args {
* inner_series *InnerSeries // go InnerSeries per shard;
* relabeled_series *RelabeledSeries // go RelabeledSeries per shard;
* relabeler_state_update *RelabelerStateUpdate // pointer to RelabelerStateUpdate;
* per_shard_relabeler uintptr // pointer to constructed per shard relabeler;
* lss uintptr // pointer to constructed label sets;
* shards_inner_series []*InnerSeries // go InnerSeries per source shard;
* shards_relabeled_series []*RelabeledSeries // go RelabeledSeries per source shard;
* shards_relabeler_state_update []*RelabelerStateUpdate // pointer to RelabelerStateUpdate per source shard;
* per_shard_relabeler uintptr // pointer to constructed per shard relabeler;
* lss uintptr // pointer to constructed label sets;
* }
*
* @param res {
* error []byte // error string if thrown
* target_lss_has_reallocations bool // true if target lss has reallocations
* error []byte // error string if thrown
* target_lss_has_reallocations bool // true if target lss has reallocations
* }
*/
void prompp_prometheus_per_shard_relabeler_append_relabeler_series(void* args, void* res);
Expand All @@ -281,6 +268,22 @@ void prompp_prometheus_per_shard_relabeler_append_relabeler_series(void* args, v
* error []byte // error string if thrown;
* }
*/
void prompp_prometheus_per_shard_singe_relabeler_update_relabeler_state(void* args, void* res);

/**
* @brief add to cache relabled data(third stage).
*
* @param args {
* shards_relabeler_state_update []*RelabelerStateUpdate // pointer to RelabelerStateUpdate per source shard;
* per_shard_relabeler uintptr // pointer to constructed per shard relabeler;
* cache uintptr // pointer to constructed Cache;
* relabeled_shard_id uint16 // relabeled shard id;
* }
*
* @param res {
* error []byte // error string if thrown;
* }
*/
void prompp_prometheus_per_shard_relabeler_update_relabeler_state(void* args, void* res);

/**
Expand Down
Loading
Loading