Skip to content

Commit 9ae21e8

Browse files
feat(progress): disambiguate filter bottleneck via sink-busy gauge + S3 inflight
The progress reporter's `bottleneck` field had been lying for a while: `filter` triggered whenever the decompressed-line channel was full, but downstream of that channel sits a lot more than the regex matcher — `sink.ingest` does the codec compression, framing, per-prefix mutex handoff, and (for S3) an mpsc send that blocks when TM can't drain. A no-regex S3 run getting reported as `bottleneck=filter` was the prompt: the label couldn't tell the user whether their CPU was pegged on zstd or whether S3 was the floor. Fix it with two new signals, both cheap: 1. `workers_in_ingest` — sink-agnostic AtomicUsize gauge. Filter workers bump it via an RAII guard around every `sink.ingest` call; the progress reporter samples it instantaneously at each tick. ≥ half the filter workers inside ingest means the sink is the lid; < half means workers are spending their time on regex / channel receive / upstream stages. 2. S3-specific drill-down — when the gauge says "sink is the lid", look at the S3 sink's `inflight_bytes` (resident bytes across the per-upload mpsc channels and reader pending buffers) and `active_uploads` count. High inflight per upload means TM is slow to drain parts → network-bound. Low inflight means the codec is the producer-side cost → CPU-bound. Wired through a new `SinkObservability` struct on the `OutputSink` trait. Default is empty; S3 implements; file / http / void return empty (they don't have meaningful internal buffers to expose at this level — for file, `iostat` is the next step). New label set, all underscore-separated, no parens: download dc channel mostly empty filter dc full, sink not busy sink_s3_codec dc full, sink busy, S3 mpsc empty sink_s3_network dc full, sink busy, S3 mpsc backed up sink_file dc full, sink busy, file sink sink_http dc full, sink busy, HTTP sink (unusual) sink_void dc full, sink busy, void sink (~never) sink_busy fallback for unknown sink kinds compress (HTTP) http line channel saturated [unchanged] upload (HTTP) http batch channel saturated [unchanged] Existing HTTP path inherits the gauge for the same disambiguation: its old `filter` label is now split into `filter` vs `sink_http` the same way. The `Search progress` log line now also carries `workers_in_ingest` and (when applicable) `sink_inflight_bytes` + `sink_active_uploads` as structured fields so post-run log analysis has the raw signals alongside the classifier's verdict. Documentation: new "Reading the `bottleneck` label" section in README.md enumerates every label, the signals it's computed from, and operator guidance for what to investigate next per label. Tests: 10 new classifier tests + 1 RAII gauge test cover every label transition, the half-of-workers threshold for sink-busy, the conservative fall-back to `sink_s3_codec` on missing inflight signal, and the older HTTP priority order under the new signature. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d6c3fef commit 9ae21e8

6 files changed

Lines changed: 427 additions & 32 deletions

File tree

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,33 @@ Usage: bucket-scrapper [OPTIONS] --start <START>
325325
**CPU**: Use samply with the profiling build profile.
326326
327327
**Memory**: `cargo build --profile profiling --features dhat-heap`, then submit the generated `profiler.json` to [dh_view](https://nnethercote.github.io/dh_view/dh_view.html).
328+
329+
## Reading the `bottleneck` label
330+
331+
Every `Search progress` log record carries a `bottleneck` field that names the stage most likely limiting throughput at the time the report was emitted. The classifier reads several channel-fill percentages plus a sink-busy gauge; the label is a one-word summary of the dominant signal.
332+
333+
### Signals it looks at
334+
335+
- **`dc_pct`** — fill percentage of the decompressed-line channel between the download/decompress stage and the filter workers. High means the downstream stages can't keep up with what's being decompressed.
336+
- **`workers_in_ingest`** — instantaneous count of filter workers currently inside `sink.ingest`. Compared against the total filter-worker count: ≥ half means the sink is where time is going (codec compression, framing, sink-internal queues), < half means filter-side work (regex / channel receive) is what's eating the workers.
337+
- **`sink_inflight_bytes`** + **`sink_active_uploads`** — S3 sink only. Bytes resident in the per-upload mpsc channels and reader pending buffers, scaled by how many uploads are currently open. High means TM is slow to drain parts to S3; low while workers are stuck in `sink.ingest` means the codec is the producer-side cost.
338+
- HTTP-mode only: **`line_pct`** and **`batch_pct`** — fill of the HTTP writer's internal line and batch channels. Provide direct visibility into the compressor and uploader stages.
339+
340+
### Label meanings
341+
342+
- **`download`** — `dc_pct` is low. The download stage isn't filling the line channel fast enough. Causes: S3 per-connection throughput, network bandwidth, low `--max-parallel`, or storage class.
343+
- **`filter`** — `dc_pct` high, `workers_in_ingest` low. Filter workers are spending their time on regex matching or waiting on channel receives. Causes: an expensive regex, lots of non-matching lines, or simply too few filter workers (`--filter-tasks`).
344+
- **`sink_s3_codec`** — `dc_pct` high, `workers_in_ingest` high, sink mpsc is roughly empty. Workers are in `sink.ingest` but bytes are leaving fast — the codec (zstd / gzip) is the producer-side cost. Try `--compression-format none`, raise the level only with eyes on this label, or look at per-prefix lock contention if you have many concurrent prefixes hitting the same per-prefix mutex.
345+
- **`sink_s3_network`** — `dc_pct` high, `workers_in_ingest` high, sink mpsc is backed up. `ChannelWriter::blocking_send` is waiting because TM / S3 isn't accepting parts fast enough. Causes: network bandwidth ceiling, `multipart_concurrency` set too low, S3 throttling.
346+
- **`sink_file`** — `dc_pct` high, `workers_in_ingest` high, file sink. Codec or the OS write path is the lid. The file sink doesn't have an internal queue to look at — reach for `iostat`/`vmstat`/`dmesg` to see whether it's the filesystem, dm-crypt, an NBD/EBS volume, or just disk pressure.
347+
- **`sink_http`** — `dc_pct` high, `workers_in_ingest` high, HTTP sink, but the HTTP writer's own channels aren't full. Unusual; usually you'd see `compress` or `upload` for HTTP-bound runs.
348+
- **`compress`** *(HTTP only)* — the HTTP writer's line channel is full. The compressor task pool isn't keeping up. Raise `--http-compressor-tasks` (or rely on the auto-inferred default).
349+
- **`upload`** *(HTTP only)* — the HTTP writer's batch channel is full. The uploader pool can't get batches out to the API fast enough. Raise `--http-upload-tasks`, raise `--max-upload-rate`, or check the upstream API.
350+
- **`sink_void`** — should never realistically appear; void's ingest is a counter bump. If it shows up, something is wrong.
351+
- **`sink_busy`** — generic fallback if the sink kind doesn't match a known label. Means: filter workers are stuck inside `sink.ingest` but we couldn't be more specific.
352+
353+
### Caveats
354+
355+
- The classifier reports the *dominant* signal at sampling time. A flapping pipeline (e.g. download bursts followed by sink bursts) will rotate labels across consecutive reports — that's a useful signal in itself.
356+
- The threshold for "channel saturated" is 80% fill; for "sink busy" it's ≥ half of filter workers inside `sink.ingest`. Both are heuristics and may need adjustment as workloads shift.
357+
- For the S3 sink, the codec-vs-network drill-down doesn't see *inside* the AWS transfer manager — once bytes leave our `ChannelWriter`, we lose visibility. If TM has its own internal queueing under pressure, we'd report `sink_s3_codec` (low local inflight) even though the actual bottleneck is downstream. Use the upload throughput numbers and `multipart_concurrency` setting alongside the label.

src/pipeline/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub use http_sink::HttpOutputSink;
2020
pub use http_writer::{HttpResultWriter, HttpWriterConfig, HttpWriterStats};
2121
pub use observer::{ChannelObserver, DownloadObserver, PipelineObserver};
2222
pub use orchestrator::{StreamingDownloader, StreamingDownloaderConfig};
23-
pub use output::{OutputSink, OutputStats};
23+
pub use output::{OutputSink, OutputStats, SinkObservability};
2424
pub use s3_writer::S3OutputSink;
2525
pub use streaming_writer::{FileWriterStats, SharedFileWriter};
2626
pub use void_writer::VoidOutputSink;

src/pipeline/orchestrator.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,13 @@ impl StreamingDownloader {
217217
let filter_lines_in = Arc::new(AtomicUsize::new(0));
218218
let filter_bytes_in = Arc::new(AtomicUsize::new(0));
219219
let workers_alive = Arc::new(AtomicUsize::new(self.config.filter_tasks));
220+
// Sink-agnostic "how many filter workers are inside sink.ingest
221+
// right now" gauge. Filter workers bump it via a RAII guard
222+
// around each `sink.ingest` call; the progress reporter samples
223+
// it instantaneously to distinguish `filter` vs `sink_*` labels.
224+
let workers_in_ingest: crate::progress::IngestGauge = Arc::new(AtomicUsize::new(0));
225+
let sink_obs = sink.sink_observability();
226+
let sink_kind = sink.type_name();
220227

221228
let progress = Arc::new(Mutex::new(PipelineProgress::new(
222229
objects.len(),
@@ -230,6 +237,10 @@ impl StreamingDownloader {
230237
filter_lines_in.clone(),
231238
filter_bytes_in.clone(),
232239
workers_alive.clone(),
240+
self.config.filter_tasks,
241+
workers_in_ingest.clone(),
242+
sink_obs,
243+
sink_kind,
233244
)));
234245

235246
// Emit initial progress at t=0 so charts always have a starting point
@@ -282,6 +293,7 @@ impl StreamingDownloader {
282293
let filter_bytes_in = filter_bytes_in.clone();
283294
let fe = fatal_error.clone();
284295
let wa = workers_alive.clone();
296+
let wii = workers_in_ingest.clone();
285297

286298
worker_set.spawn(async move {
287299
let result = Self::filter_worker(
@@ -294,6 +306,7 @@ impl StreamingDownloader {
294306
filter_lines_in,
295307
filter_bytes_in,
296308
fe,
309+
wii,
297310
)
298311
.await;
299312
wa.fetch_sub(1, Ordering::Relaxed);
@@ -854,6 +867,7 @@ impl StreamingDownloader {
854867
filter_lines_in: Arc<AtomicUsize>,
855868
filter_bytes_in: Arc<AtomicUsize>,
856869
fatal_error: Option<Arc<AtomicBool>>,
870+
workers_in_ingest: crate::progress::IngestGauge,
857871
) -> Result<usize> {
858872
let result = tokio::task::spawn_blocking(move || -> Result<usize> {
859873
let mut local = 0usize;
@@ -875,6 +889,12 @@ impl StreamingDownloader {
875889
filter_bytes_in.fetch_add(line_len, Ordering::Relaxed);
876890

877891
if searcher.matches_line(&line.data) {
892+
// RAII gauge so the progress reporter can tell whether
893+
// filter workers are stuck inside the sink (codec /
894+
// mpsc send / I/O) or actually doing filter-side work.
895+
// Drop on the same line frees the slot regardless of
896+
// panics / early returns.
897+
let _ingest_guard = crate::progress::IngestGuard::new(&workers_in_ingest);
878898
sink.ingest(&line.source.prefix, &line.data)?;
879899
local += 1;
880900
match_count.fetch_add(1, Ordering::Relaxed);

src/pipeline/output.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use anyhow::Result;
1010
use serde_json::{Map, Value};
1111
use std::future::Future;
1212
use std::pin::Pin;
13-
use std::sync::atomic::AtomicBool;
13+
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
1414
use std::sync::Arc;
1515

1616
/// Boxed future used by [`OutputSink::finish`] to make the trait `dyn`-safe
@@ -89,4 +89,34 @@ pub trait OutputSink: Send + Sync {
8989

9090
/// One-line label describing the sink for end-of-run logging.
9191
fn type_name(&self) -> &'static str;
92+
93+
/// Optional handle to per-sink internal-state metrics, sampled by
94+
/// the progress reporter. Used to disambiguate the bottleneck label:
95+
/// when filter workers are stuck inside `sink.ingest`, this tells us
96+
/// *why* — is the sink's own outbound queue full (network-bound) or
97+
/// is the queue empty because the producer side is slow (codec-bound)?
98+
///
99+
/// Default returns empty; sinks with meaningful internal buffering
100+
/// (currently just S3) override.
101+
fn sink_observability(&self) -> SinkObservability {
102+
SinkObservability::default()
103+
}
104+
}
105+
106+
/// Per-sink internal-state metrics exposed to the progress reporter.
107+
///
108+
/// Both fields are optional: a sink that doesn't have a meaningful
109+
/// internal queue (file, void) returns `None` for both, and the
110+
/// progress reporter treats the absence of the signal conservatively
111+
/// — see `classify_bottleneck_non_http` for the exact rules.
112+
#[derive(Debug, Clone, Default)]
113+
pub struct SinkObservability {
114+
/// Bytes currently resident in sink-internal channels and reader
115+
/// pending buffers. Read by the progress reporter on each tick.
116+
pub inflight_bytes: Option<Arc<AtomicU64>>,
117+
/// Count of upload contexts currently open — used to scale the
118+
/// per-upload "backed up" threshold against. For the S3 sink this
119+
/// is the number of multipart uploads with a still-living
120+
/// `ChannelWriter` or a still-running TM driver task.
121+
pub active_uploads: Option<Arc<AtomicUsize>>,
92122
}

src/pipeline/s3_writer.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ use aws_sdk_s3_transfer_manager as tm;
110110
use bytes::Bytes;
111111
use serde_json::json;
112112
use std::collections::HashMap;
113-
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
113+
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
114114
use std::sync::{Arc, Mutex};
115115
use tokio::runtime::Handle;
116116
use tokio::task::JoinHandle;
@@ -223,6 +223,12 @@ struct Inner {
223223
/// `OutputStats.extras` so the e2e suite can assert the sink doesn't
224224
/// buffer the whole run in memory.
225225
peak_inflight_bytes: Arc<AtomicU64>,
226+
/// Count of currently-open TM upload contexts. Incremented when
227+
/// `open_upload` spawns a driver task; decremented at the end of
228+
/// that driver task (i.e. once TM finalizes the multipart). Sampled
229+
/// by the progress reporter via `SinkObservability` to scale the
230+
/// per-upload "channel backed up" threshold.
231+
active_uploads: Arc<AtomicUsize>,
226232
fatal: Arc<AtomicBool>,
227233
/// Once `true`, the sink is finalized and `ingest` returns an error
228234
/// instead of opening a fresh upload. Prevents `flush_batch`-style
@@ -295,6 +301,7 @@ impl S3OutputSink {
295301
objects_written: AtomicU64::new(0),
296302
inflight_bytes: Arc::new(AtomicU64::new(0)),
297303
peak_inflight_bytes: Arc::new(AtomicU64::new(0)),
304+
active_uploads: Arc::new(AtomicUsize::new(0)),
298305
fatal: Arc::new(AtomicBool::new(false)),
299306
finished: AtomicBool::new(false),
300307
});
@@ -389,6 +396,13 @@ impl S3OutputSink {
389396
.initiate()
390397
.map_err(|e| anyhow!("S3 sink: failed to initiate upload for `{key}`: {e}"))?;
391398

399+
// From this moment until the TM driver task exits, this upload
400+
// contributes to `inflight_bytes` (channel + reader pending) and
401+
// counts as an "active upload" for the progress reporter's
402+
// backpressure heuristic. Decrement in the driver task's
403+
// closure below.
404+
inner.active_uploads.fetch_add(1, Ordering::Relaxed);
405+
392406
let batch_stats = Arc::new(BatchStats::default());
393407

394408
// Spawn a tiny driver task that awaits TM completion, then folds
@@ -401,6 +415,9 @@ impl S3OutputSink {
401415
let bytes_sent_for_task = bytes_sent.clone();
402416
async move {
403417
let result = upload_handle.join().await;
418+
// Whatever TM's outcome, this upload is no longer active —
419+
// its mpsc is fully drained and its driver is exiting.
420+
inner.active_uploads.fetch_sub(1, Ordering::Relaxed);
404421
let lines = stats.lines.load(Ordering::Relaxed);
405422
let plaintext = stats.plaintext.load(Ordering::Relaxed);
406423
let bytes = bytes_sent_for_task.load(Ordering::Relaxed);
@@ -745,6 +762,13 @@ impl OutputSink for S3OutputSink {
745762
fn type_name(&self) -> &'static str {
746763
"s3"
747764
}
765+
766+
fn sink_observability(&self) -> crate::pipeline::SinkObservability {
767+
crate::pipeline::SinkObservability {
768+
inflight_bytes: Some(self.inner.inflight_bytes.clone()),
769+
active_uploads: Some(self.inner.active_uploads.clone()),
770+
}
771+
}
748772
}
749773

750774
#[cfg(test)]

0 commit comments

Comments
 (0)