-
Notifications
You must be signed in to change notification settings - Fork 2k
feat(aws_s3 sink): Add Apache Parquet encoder support #24372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
|
I have read the CLA Document and I hereby sign the CLA |
7c16cdd to
672999d
Compare
…ut defaulted to off
thomasqueirozb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rorylshanks, thanks for your contribution! It looks like there are failing checks (run make check-clippy for example). This is also failing to compile after merging master because you removed BatchSerializerConfig::build which is used by the clickhouse sink. I'll circle back to this PR and give it a review once I see commits pushed to this branch
Co-authored-by: Thomas <[email protected]>
|
Hey Thomas! Thanks for the review so far. I have made some changes to fix the check issues, and also to change how the docs are templated, as I think this codec only makes sense for AWS S3 and other S3 APIs, and I don't really have time to test that the parquet implementation works for every sink. So I limited the docs to only include the parquet format for S3 et al, and not for others. But please correct me if this is the wrong approach! |
|
Hi @rorylshanks , is buffer supported added for back pressure handling ? nice for taking this up. |
| return sort_hash_nested(unwrapped_resolved_schema) | ||
| end | ||
|
|
||
| PARQUET_ALLOWED_SINKS = %w[aws_s3 gcp_cloud_storage azure_blob].freeze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any particular reason for this? I assume it'd be possible to use the parquet codec with the file sink also no? In general we don't "restrict" codecs like this, sometimes they are incredibly useful for debugging. (One recent example that comes to mind is the otlp codec which is not useful at all outside of some select sinks, notably the opentelemetry sink, but we still support it for all sinks)
| 1. **Explicit Schema**: Define the exact structure and data types for your Parquet files | ||
| 2. **Automatic Schema Inference**: Let Vector automatically infer the schema from your event data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 1. **Explicit Schema**: Define the exact structure and data types for your Parquet files | |
| 2. **Automatic Schema Inference**: Let Vector automatically infer the schema from your event data | |
| 1. **Explicit Schema**: Define the exact structure and data types for your Parquet files. | |
| 2. **Automatic Schema Inference**: Let Vector automatically infer the schema from your event data. |
| ### allow_nullable_fields | ||
| When enabled, missing or incompatible values will be encoded as NULL even for fields that | ||
| would normally be non-nullable. This is useful when working with downstream systems that | ||
| can handle NULL values through defaults or computed columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ### allow_nullable_fields | |
| When enabled, missing or incompatible values will be encoded as NULL even for fields that | |
| would normally be non-nullable. This is useful when working with downstream systems that | |
| can handle NULL values through defaults or computed columns. |
This can be removed as we already have the automatically generated doc for this option
| **Per-column Bloom filter settings:** | ||
| - **bloom_filter**: Enable Bloom filter for this column (default: `false`) | ||
| - **bloom_filter_num_distinct_values**: Expected number of distinct values for this column's Bloom filter | ||
| - Low cardinality (countries, states): `1,000` - `100,000` | ||
| - Medium cardinality (cities, products): `100,000` - `1,000,000` | ||
| - High cardinality (user IDs, UUIDs): `10,000,000+` | ||
| - If not specified, defaults to `1,000,000` | ||
| - Automatically capped to the `row_group_size` value | ||
| - **bloom_filter_false_positive_pct**: False positive probability for this column's Bloom filter | ||
| - `0.05` (5%): Good balance for general use | ||
| - `0.01` (1%): Better for high-selectivity queries where precision matters | ||
| - `0.10` (10%): Smaller filters when storage is a concern | ||
| - If not specified, defaults to `0.05` | ||
| A false positive means the Bloom filter indicates a value *might* be in a row group when it | ||
| actually isn't, requiring the engine to read and filter that row group. Lower FPP means fewer | ||
| unnecessary reads but larger Bloom filters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This info should be present in the automatically generated documentation for these fields instead
| encoding: | ||
| codec: parquet | ||
| parquet: | ||
| schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a .parquet format (or something similar) so that the user can point to a file instead of defining this inside the configuration yml? In these scenarios we usually opt for a config file instead. See parse_proto and validate_json_schema.
| #### max_columns | ||
| Maximum number of columns to encode when using automatic schema inference. Additional | ||
| columns beyond this limit will be silently dropped. Columns are selected in the order | ||
| they appear in the first event. | ||
| This protects against accidentally creating Parquet files with too many columns, which | ||
| can cause performance issues in query engines. | ||
| **Only applies when `infer_schema` is enabled**. Ignored when using explicit schema. | ||
| **Default**: `1000` | ||
| **Recommended values:** | ||
| - Standard use cases: `1000` (default) | ||
| - Wide tables: `500` - `1000` | ||
| - Performance-critical: `100` - `500` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we should avoid including parameter descriptions and examples in this file. These should live in source code instead so that the documentation is easier to maintain and avoid drift
| .to_compression(config.compression_level) | ||
| .map_err(vector_common::Error::from)?; | ||
|
|
||
| tracing::debug!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| tracing::debug!( | |
| debug!( |
Nit: we usually avoid using tracing:: and opt to import this at the start of the file instead.
| let fpp = bloom_config.fpp.unwrap_or(0.05); // Default 5% false positive rate | ||
| let mut ndv = bloom_config.ndv.unwrap_or(1_000_000); // Default 1M distinct values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a static constant instead. Magic values tend to fall out of date with docs
| /// **Example:** | ||
| /// ```yaml | ||
| /// sorting_columns: | ||
| /// - column: timestamp | ||
| /// descending: true | ||
| /// - column: user_id | ||
| /// descending: false | ||
| /// ``` | ||
| /// | ||
| /// If not specified, rows are written in the order they appear in the batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that using #[configurable(metadata(docs::examples = "foo bar"))] works here. I'm not sure how this would render with multi line examples but if it the rendering doesn't work properly I'll make sure to fix it in a separate PR
| /// Mutually exclusive with `infer_schema`. Must specify either `schema` or `infer_schema: true`. | ||
| /// | ||
| /// Supported types: utf8, int8, int16, int32, int64, uint8, uint16, uint32, uint64, | ||
| /// float32, float64, boolean, binary, timestamp_second, timestamp_millisecond, | ||
| /// timestamp_microsecond, timestamp_nanosecond, date32, date64, and more. | ||
| #[serde(default)] | ||
| #[configurable(metadata(docs::examples = "schema_example()"))] | ||
| pub schema: Option<SchemaDefinition>, | ||
|
|
||
| /// Automatically infer schema from event data | ||
| /// | ||
| /// When enabled, the schema is inferred from each batch of events independently. | ||
| /// The schema is determined by examining the types of values in the events. | ||
| /// | ||
| /// **Type mapping:** | ||
| /// - String values → `utf8` | ||
| /// - Integer values → `int64` | ||
| /// - Float values → `float64` | ||
| /// - Boolean values → `boolean` | ||
| /// - Timestamp values → `timestamp_microsecond` | ||
| /// - Arrays/Objects → `utf8` (serialized as JSON) | ||
| /// | ||
| /// **Type conflicts:** If a field has different types across events in the same batch, | ||
| /// it will be encoded as `utf8` (string) and all values will be converted to strings. | ||
| /// | ||
| /// **Important:** Schema consistency across batches is the operator's responsibility. | ||
| /// Use VRL transforms to ensure consistent types if needed. Each batch may produce | ||
| /// a different schema if event structure varies. | ||
| /// | ||
| /// **Bloom filters:** Not supported with inferred schemas. Use explicit schema for Bloom filters. | ||
| /// | ||
| /// Mutually exclusive with `schema`. Must specify either `schema` or `infer_schema: true`. | ||
| #[serde(default)] | ||
| #[configurable(metadata(docs::examples = true))] | ||
| pub infer_schema: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be expressed with an enum which will simplify a lot of the checks. It will also render docs in such a way that will make it clear when certain options are available with a schema and those available without.
Something like
enum Schema {
Inferred { infer_schema: bool, exclude_columns: Option<Vec<String>> },
Schema { schema: SchemaDefinition },
}
/// Configuration for Parquet serialization
#[configurable_component]
pub struct ParquetSerializerConfig {
#[serde(flatten)]
schema: SchemaThis is very rough and removing all doc comments but should work. I'm sure you can find examples of how to do this properly in the Vector code base
I wouldn't expect anyone to test every sink! I think we can do just a quick sanity check and write to a file using the file sink or event a socket using the socket sink and verify that it is working correctly. We can then add a note to the codec docs specifying which sinks it should be used with. As I mentioned before, we usually show all codecs, even if it is very unusual for anyone to use some of them.
don't worry about this for now. Worst case scenario I can fix this myself before merging :) |
Summary
This PR adds Apache Parquet encoding support to the AWS S3 sink, enabling Vector to write columnar Parquet files optimized for analytics workloads.
Parquet is a columnar storage format that provides efficient compression and encoding, making it ideal for long-term storage and query performance with tools like AWS Athena, Apache Spark, and Presto. This implementation allows users to write properly formatted Parquet files with configurable schemas, compression, and row group sizing.
Key features:
Vector configuration
How did you test this PR?
I tested it against production Kafka data, and it produced correctly formatted Parquet files in S3.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
parquetcolumnar format in theaws_s3sink #1374parquetcodec #17395