S3 bucket content searcher. Downloads compressed objects from S3, stream-decompresses, filters lines by regex, and routes matches to one of four pluggable output sinks: local files, an HTTP API (NDJSON), an S3 bucket, or /dev/null (void). The file/http/s3 sinks share a single Codec abstraction (zstd / gzip / none) so the on-disk extension, the wire Content-Encoding, and the encoder bytes stay in lockstep.
Streaming pipeline with decoupled stages connected by bounded channels:
S3 GetObject stream (semaphore-bounded, range-based resume on retries)
→ async chunk loop → flume::bounded(4) → ChunkReader (impl Read)
→ spawn_blocking: stream-decompress (.gz/.zst) line-by-line
→ line_ch (flume bounded)
→ filter_worker pool (spawn_blocking, regex via grep-matcher)
→ Arc<dyn OutputSink>::ingest(prefix, line)
├─ FileOutputSink → SharedFileWriter (per-prefix Codec-encoded files)
├─ HttpOutputSink → compressor pool → uploader pool → HTTP POST (AIMD)
├─ S3OutputSink → per-prefix Codec-encoded batches → uploader pool → PutObject
└─ VoidOutputSink → atomic counters only (benchmarking)
Key modules:
src/pipeline/orchestrator.rs— pipeline orchestrator: download → decompress → filter → sinksrc/pipeline/output.rs—OutputSinktrait +OutputStatssrc/pipeline/codec.rs— output codec (zstd / gzip / none) +CompressionConfig+CodecEncoder<W>src/pipeline/path_template.rs—{prefix}/{prefix_hash}/{seq}/{run_id}/{ext}template renderer +CollisionTracker, shared by file and s3 sinkssrc/pipeline/http_writer.rs/http_sink.rs— HTTP output internals + sink adaptersrc/pipeline/streaming_writer.rs/file_sink.rs— file output internals + sink adaptersrc/pipeline/s3_writer.rs— S3 output sink with per-prefix batchingsrc/pipeline/void_writer.rs— no-op sink with atomic counterssrc/pipeline/observer.rs— observer primitives:PipelineObserver,ChannelObserver,DownloadObserversrc/config/output.rs—OutputConfigtagged enum +${ENV}interpolation + template/codec validationsrc/config/resolve.rs— selects config-driven vs CLI-driven mode (mixing is a hard error)src/matcher.rs—LineMatcher: stateless regex wrapper aroundgrep-matchersrc/progress.rs— periodic structured-log progress reports with bottleneck detectionsrc/config/path_formatter.rs— date/hour prefix formatting fromBucketConfigpath schemas
- Rust, Tokio async runtime
aws-sdk-s3for S3 operations (downloads + S3 output uploads)aws-sdk-s3-transfer-manager(developer preview, pinned=0.1.3) drives multipart uploads on the s3 sink, sharing our pre-builtaws_sdk_s3::ClientviaConfig::Builder::client(...)flumefor bounded MPMC channelsgrep-matcher/grep-regexfor line matchingzstd/flate2for compressiontracingfor structured logging (text or JSON via--log-format)clapfor CLI argument parsingwiremockfor HTTP mock tests
cargo test # unit + integration + doctests
cargo clippy # zero warnings expectedunused manifest key: profile.profiling.force-frame-pointers— Cargo bug with custom profiles, not a code issueaws-sdk-s3-transfer-manageris a developer preview (no production-stability guarantee), pinned at=0.1.3. It declares itsaws-sdk-s3andaws-configdeps withoutdefault-features = false, soaws-sdk-s3's default"rustls"feature is unconditionally on; that activatesaws-smithy-runtime/tls-rustls→aws-smithy-http-client/legacy-rustls-ring, pulling in the legacy hyper-rustls 0.24 / rustls 0.21 / rustls-webpki 0.101 path. That re-introduces RUSTSEC-2026-0098/0099/0104 (name-constraint and CRL-parsing issues in rustls-webpki). Our own directaws-sdk-s3dep already passesdefault-features = false+default-https-client, but Cargo's additive feature unification means TM's defaults still win. Tracked upstream as awslabs/aws-s3-transfer-manager-rs#138 (open, no movement). Accepted as a deliberate tradeoff for AWS-maintained multipart code; revisit when that issue lands a fix and a new TM release ships.