-
Notifications
You must be signed in to change notification settings - Fork 695
Optimize ingestion from Kafka on ingesters with mixed size tenants #13924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // On cancellation (e.g., pushToStorage error), any records remaining in the buffer won't | ||
| // be processed. This is acceptable because errors trigger a retry of the entire batch, | ||
| // and the memory will be freed by GC. | ||
| recordsChannel = make(chan parsedRecord, 128) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: this optimization is slightly impactful. The big benefit comes from not using the sequential pusher at all.
|
💻 Deploy preview deleted (Optimize ingestion from Kafka on ingesters with mixed size tenants ). |
| idealShards := c.idealShardsFor(userID) | ||
| var p PusherCloser | ||
| if idealShards <= 1 { | ||
| if idealShards <= 1 && c.sequentialPusherEnabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: this the optimization. Not using sequential pusher at all.
tacole02
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs look good! I left a few minor suggestions. Thank you!
| } | ||
| if numLarge == 0 && cfg.LargeTenants.TenantPercent > 0 { | ||
| numLarge = 1 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixture generator may create more tenants than requested
Low Severity
The tenant count calculation computes numLarge as the remainder (numTenants - numSmall - numMedium) before applying the "ensure at least 1 tenant" adjustments to numSmall and numMedium. When those values are later incremented, the total tenant count can exceed the requested numTenants. For example, with numTenants=2 and mixed percentages (40%/44%/16%), the initial calculation yields 0/0/2, but after adjustments becomes 1/1/2, creating 4 tenants instead of 2. This only affects the benchmark fixture generator, not production ingestion code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected. We're fine with it.
tcard
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just a few suggestions.
| tenant.nextSeriesIdx = (tenant.nextSeriesIdx + 1) % tenant.uniqueSeries | ||
| } | ||
|
|
||
| return &mimirpb.WriteRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate this is meant for unit tests, but I think it's useful for those "synthetic" WriteRequests to have their BufferHolder set and holding UnsafeMutableStrings to the buffer, e. g. by marshaling and then unmarshaling. This makes them more realistic, and increases the coverage of reference leaks detection we're introducing in #13609.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like in 2dd6c27 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm only now realizing this requires the new mimirpb.Unmarshal introduced in #13609. I'll make a note to reconcile this later.
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Co-authored-by: Taylor C <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
bdc6faf to
2dd6c27
Compare
seizethedave
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change makes sense to me and I can't find anything wrong with it.
… 1 shard (#13961) #### What this PR does This PR is a follow up of #13924. In this PR I'm doing a micro optimization to `parallelStorageShards` for the case there's only 1 shard. The impact of this optimization is minimal, but I think doesn't make the code harder to follow, so it may be worth to keep it. My local benchmarks are not super stable, but here you can get an idea of the impact (minimal). Keep in mind `PusherConsumer_ParallelPusher_MultiTenant` uses a mocked backend. In the real world, where metrics are actually ingested in TSDB, the impact is much smaller: ``` goos: darwin goarch: arm64 pkg: github.com/grafana/mimir/pkg/storage/ingest cpu: Apple M3 Pro │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-before.txt │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-after.txt │ │ sec/op │ sec/op vs base │ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1-11 8.421µ ± 5% 8.862µ ± 2% +5.24% (p=0.009 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=10-11 8.578µ ± 81% 8.971µ ± 46% ~ (p=0.240 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=100-11 8.394µ ± 3% 9.005µ ± 1% +7.28% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1000-11 8.308µ ± 2% 8.998µ ± 0% +8.31% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1-11 21.22µ ± 4% 21.35µ ± 2% ~ (p=0.937 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=10-11 43.02µ ± 6% 40.24µ ± 0% -6.46% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=100-11 43.24µ ± 1% 40.50µ ± 0% -6.33% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1000-11 43.37µ ± 2% 40.73µ ± 1% -6.10% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1-11 126.3µ ± 0% 120.2µ ± 1% -4.85% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=10-11 137.7µ ± 0% 122.0µ ± 3% -11.40% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=100-11 304.7µ ± 3% 275.2µ ± 0% -9.69% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1000-11 303.5µ ± 2% 275.1µ ± 0% -9.35% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1-11 1.142m ± 1% 1.124m ± 1% -1.58% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=10-11 1.051m ± 1% 1.115m ± 0% +6.10% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=100-11 1.264m ± 1% 1.301m ± 0% +2.91% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1000-11 3.060m ± 3% 2.996m ± 4% ~ (p=0.240 n=6) geomean 97.29µ 95.70µ -1.63% │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-before.txt │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-after.txt │ │ B/op │ B/op vs base │ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1-11 17.40Ki ± 0% 17.44Ki ± 0% +0.19% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=10-11 17.40Ki ± 0% 17.43Ki ± 0% +0.19% (p=0.035 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=100-11 17.39Ki ± 0% 17.43Ki ± 0% +0.21% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1000-11 17.39Ki ± 0% 17.43Ki ± 0% +0.20% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1-11 25.15Ki ± 0% 25.21Ki ± 0% +0.25% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=10-11 78.70Ki ± 0% 78.70Ki ± 0% ~ (p=0.223 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=100-11 78.70Ki ± 0% 78.70Ki ± 0% +0.01% (p=0.037 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1000-11 78.69Ki ± 0% 78.70Ki ± 0% ~ (p=0.058 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1-11 120.9Ki ± 0% 121.7Ki ± 0% +0.68% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=10-11 158.2Ki ± 0% 159.4Ki ± 0% +0.80% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=100-11 707.4Ki ± 0% 708.3Ki ± 0% +0.13% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1000-11 707.3Ki ± 0% 708.3Ki ± 0% +0.14% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1-11 985.3Ki ± 0% 978.7Ki ± 1% -0.66% (p=0.041 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=10-11 1.064Mi ± 0% 1.091Mi ± 0% +2.54% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=100-11 1.479Mi ± 0% 1.547Mi ± 0% +4.62% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1000-11 6.935Mi ± 0% 6.947Mi ± 0% +0.17% (p=0.002 n=6) geomean 156.1Ki 157.0Ki +0.59% │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-before.txt │ BenchmarkPusherConsumer_ParallelPusher_MultiTenant-after.txt │ │ allocs/op │ allocs/op vs base │ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1-11 48.00 ± 0% 48.00 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=10-11 48.00 ± 0% 48.00 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=100-11 48.00 ± 0% 48.00 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=1,tenants=1000-11 48.00 ± 0% 48.00 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1-11 166.0 ± 0% 166.0 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=10-11 291.0 ± 0% 291.0 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=100-11 291.0 ± 0% 291.0 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=10,tenants=1000-11 291.0 ± 0% 291.0 ± 0% ~ (p=1.000 n=6) ¹ PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1-11 1.364k ± 0% 1.366k ± 0% +0.15% (p=0.004 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=10-11 1.466k ± 0% 1.467k ± 0% +0.07% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=100-11 2.664k ± 0% 2.664k ± 0% ~ (p=0.545 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=100,tenants=1000-11 2.663k ± 0% 2.664k ± 0% ~ (p=0.182 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1-11 13.28k ± 0% 13.27k ± 0% ~ (p=0.093 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=10-11 13.38k ± 0% 13.41k ± 0% +0.19% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=100-11 14.36k ± 0% 14.37k ± 0% +0.08% (p=0.002 n=6) PusherConsumer_ParallelPusher_MultiTenant/records=1000,tenants=1000-11 26.23k ± 0% 26.22k ± 0% -0.07% (p=0.002 n=6) geomean 784.6 784.8 +0.02% ¹ all samples are equal ``` No changelog because I will add one at the end of this work, once `-ingest-storage.kafka.ingestion-concurrency-sequential-pusher-enabled` will be removed and the new behaviour will be the default. With the default config `parallelStorageShards` is never used with only 1 shard (you need to set `-ingest-storage.kafka.ingestion-concurrency-sequential-pusher-enabled=false` to use `parallelStorageShards` even when there's only 1 shard). #### Which issue(s) this PR fixes or relates to N/A #### Checklist - [ ] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Optimizes shard routing when `numShards == 1` and refines performance benchmarks. > > - In `parallelStorageShards.PushToStorageAndReleaseRequest`, bypasses label hashing/modulo and always targets shard `0` when only one shard; metadata routing now skips round-robin and random start when single-shard. > - `BenchmarkPusherConsumer_ParallelPusher_MultiTenant`: replaces single-parameter sweep with nested `records×tenants` cases, prebuilds records per case, and sets `IngestionConcurrencySequentialPusherEnabled=false` to exercise parallel pusher consistently. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 49af667. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> Signed-off-by: Marco Pracucci <[email protected]>
What this PR does
In this PR I'm adding a new temporarily config option (
ingestion-concurrency-sequential-pusher-enabled) that is enabled by default (to preserve the existing behaviour) but, when disabled, is expected to speed up the ingestion from Kafka by up to 2x on ingesters with mixed sized tenants. The config option is temporarily because my plan is to use it to gradually roll it out at Grafana Labs and then, if everything goes well, remove it and always use the new behaviour (that is removing sequential pusher at all).This PR is large, but the real logic change is just a couple of lines. The rest is tests (more details below).
What's the optimization about?
When concurrent ingestion is enabled, the write request encoded in a Kafka record can be ingested in Mimir using two different paths:
When sequential pusher is used, the write request is ingested into TSDB "as is" and synchronously. When parallel pusher is used, the content of the write request is sharded (optional) and then appended to a batch and the batch (when full) is ingested into TSDB asynchronously.
Since the parallel pusher has some extra work to do for sharding and batching, the old belief was that overhead would have hit negatively for tenants with a small number of time series to ingest. For this reason, we dynamically chose between the sequential and parallel pusher based on the actual number of estimated series to ingest for a given tenant.
What we didn't consider in the past is that parallel pusher not only shard the metrics into N shards for parallel ingestion, but it also makes the ingestion asynchronous. This means that while we ingest metrics for a tenant we can have other tenants ingesting into TSDB too. On the contrary, sequential pusher blocks on each write to TSDB. This means that when we have some tenants using sequential pusher and others using parallel pusher, each time we use the sequential pusher we're effectively pausing the ingestion for all other tenants, even the ones that would have used the parallel pusher, because the sequential pusher is synchronous.
In this PR I'm just adding an option to never use the sequential pusher at all, and always use the parallel pusher even when no sharding is required (but using the parallel pusher we take advantage of the batching and asynchronous ingestion). As you will see below, in local testing I couldn't measure any performance degradation in any scenario.
Benchmarks
To benchmark it I've dumped real production data from a few selected Mimir clusters at Grafana Labs, including clusters with 1 single tenant and clusters with many tenants. Clusters with only 1 tenant don't see any big benefit because they were already using always the parallel pusher, but clusters with mixed size tenants (where some tenants used sequential pusher and others used the parallel one) showed up to 2x speed up.
Then, based on the real production data dump, I've generated some fixtures that try to mimic the production data pattern. This will allow everyone to run these benchmark over time.
These are the test results using the real production data dump (I've redacted the actual name of Grafana Cloud clusters):
These are the tests based on the fixtures generated from the patterns observed in production workload:
Other notes
kafkatool dump analyseto analyse a dump and extract the key information to configure the fixture generator.parallelStorageShards.PushToStorageAndReleaseRequest()for the case there's only 1 shard, but I've already tested it and it's not very impactful.Which issue(s) this PR fixes or relates to
N/A
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]. If changelog entry is not needed, please add thechangelog-not-neededlabel to the PR.about-versioning.mdupdated with experimental features.Note
Introduces an experimental switch to prefer parallel ingestion over sequential for small-tenant workloads and adds tooling/benchmarks to validate performance.
ingestion_concurrency_sequential_pusher_enabled(default: true) to Kafka ingest config; updates config descriptors, help text, docs, and defaultsparallelStoragePusher; use sequential pusher only when enabled andidealShards<=1LabelAdaptersHash; rename writer configdisableLinger->DisableLingerpkg/storage/ingest/fixture_generator*) and extensive benchmarks for Kafka replay and pusher consumer; test updates to pass loggerkafkatool: newdump analysecommand, refactored dump parsing helpers, and improved offset flag helpWritten by Cursor Bugbot for commit 2dd6c27. This will update automatically on new commits. Configure here.