Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/modules/components/pages/outputs/iceberg.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ output:
manifest_merge_enabled: true
max_snapshot_age: 24h
max_retries: 3
parquet:
string_encoding: delta_length_byte_array
batching:
count: 0
byte_size: 0
Expand Down Expand Up @@ -925,6 +927,28 @@ Maximum number of times to retry a failed transaction commit.

*Default*: `3`

=== `parquet`

Parquet writer configuration.


*Type*: `object`


=== `parquet.string_encoding`

The encoding to use for string and binary columns. Use `plain` for compatibility with readers that do not support `DELTA_LENGTH_BYTE_ARRAY` encoding, such as AWS Redshift Spectrum.


*Type*: `string`

*Default*: `"delta_length_byte_array"`

Options:
`plain`
, `delta_length_byte_array`
.

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].
Expand Down
13 changes: 13 additions & 0 deletions internal/impl/iceberg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ const (
// Performance fields
ioFieldBatching = "batching"
ioFieldMaxInFlight = "max_in_flight"

// Parquet writer fields
ioFieldParquet = "parquet"
ioFieldParquetStringEncoding = "string_encoding"
)

// icebergOutputConfig returns the configuration spec for the Iceberg output.
Expand Down Expand Up @@ -347,6 +351,15 @@ array:list
Advanced().
Optional(),

// Parquet writer configuration
service.NewObjectField(ioFieldParquet,
service.NewStringEnumField(ioFieldParquetStringEncoding, "plain", "delta_length_byte_array").
Description("The encoding to use for string and binary columns. Use `plain` for compatibility with readers that do not support `DELTA_LENGTH_BYTE_ARRAY` encoding, such as AWS Redshift Spectrum.").
Default("delta_length_byte_array"),
).Description("Parquet writer configuration.").
Advanced().
Optional(),

// Batching
service.NewBatchPolicyField(ioFieldBatching),
service.NewOutputMaxInFlightField().Default(4),
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/iceberg/e2e/glue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newRouter(t *testing.T, namespace, table string, schemaEvo bool) *icebergim
Enabled: schemaEvo,
TableLocation: fmt.Sprintf("s3://%s/", *glueBucket),
}
router := icebergimpl.NewRouter(catalogConfig(), namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
router := icebergimpl.NewRouter(catalogConfig(), namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
t.Cleanup(func() { router.Close() })
return router
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/iceberg/e2e/polaris-aws/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func newRouter(t *testing.T, catalogCfg catalogx.Config, namespace, tableName st
schemaEvoCfg := icebergimpl.SchemaEvolutionConfig{
Enabled: schemaEvo,
}
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
t.Cleanup(func() { router.Close() })
return router
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/iceberg/e2e/polaris-azure/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func newRouter(t *testing.T, catalogCfg catalogx.Config, namespace, table string
schemaEvoCfg := icebergimpl.SchemaEvolutionConfig{
Enabled: schemaEvo,
}
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
t.Cleanup(func() { router.Close() })
return router
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/iceberg/integration/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (infra *testInfrastructure) NewRouter(
MaxSnapshotAge: 24 * time.Hour,
MaxRetries: 3,
}
router := icebergimpl.NewRouter(infra.CatalogConfig(), namespaceStr, tableStr, o.caseSensitive, o.schemaEvoCfg, commitCfg, logger)
router := icebergimpl.NewRouter(infra.CatalogConfig(), namespaceStr, tableStr, o.caseSensitive, o.schemaEvoCfg, commitCfg, nil, logger)
t.Cleanup(func() { router.Close() })
return router
}
Expand Down
20 changes: 18 additions & 2 deletions internal/impl/iceberg/output_iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
_ "github.com/apache/iceberg-go/io/gocloud"
"github.com/parquet-go/parquet-go"

"github.com/redpanda-data/benthos/v4/public/service"

Expand Down Expand Up @@ -102,9 +103,24 @@ func newIcebergOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resourc
return nil, fmt.Errorf("parsing commit config: %w", err)
}

// Create router
rtr := NewRouter(catalogCfg, namespaceStr, tableStr, caseSensitive, schemaEvoCfg, commitCfg, mgr.Logger())
// Parse parquet config
var writerOpts []parquet.WriterOption
if conf.Contains(ioFieldParquet) {
strEnc, err := conf.FieldString(ioFieldParquet, ioFieldParquetStringEncoding)
if err != nil {
return nil, fmt.Errorf("parsing %s: %w", ioFieldParquetStringEncoding, err)
}
switch strEnc {
case "plain":
writerOpts = append(writerOpts, parquet.DefaultEncodingFor(parquet.ByteArray, &parquet.Plain))
case "delta_length_byte_array":
// default - noop
default:
return nil, fmt.Errorf("unsupported %s value: %q, please consider raising an issue to request support for feature gap", ioFieldParquetStringEncoding, strEnc)
}
}

Comment on lines +106 to 122
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new parquet.string_encoding config field is introduced without any test coverage. Per the project's test patterns, changed code should be accompanied by tests — at minimum a config-parsing/linting test exercising both plain and delta_length_byte_array (and ideally a writer test verifying the option is actually plumbed through to the underlying parquet writer). The existing writer_test.go and router_test.go could be extended for this.

rtr := NewRouter(catalogCfg, namespaceStr, tableStr, caseSensitive, schemaEvoCfg, commitCfg, writerOpts, mgr.Logger())
return &icebergOutput{
router: rtr,
logger: mgr.Logger(),
Expand Down
6 changes: 5 additions & 1 deletion internal/impl/iceberg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
"github.com/parquet-go/parquet-go"

"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"
Expand Down Expand Up @@ -71,6 +72,7 @@ type Router struct {
caseSensitive bool
schemaEvoCfg SchemaEvolutionConfig
commitCfg CommitConfig
writerOpts []parquet.WriterOption
resolver *typeResolver

entries sync.Map // tableKey -> *tableEntry
Expand All @@ -89,6 +91,7 @@ func NewRouter(
caseSensitive bool,
schemaEvoCfg SchemaEvolutionConfig,
commitCfg CommitConfig,
writerOpts []parquet.WriterOption,
logger *service.Logger,
) *Router {
return &Router{
Expand All @@ -98,6 +101,7 @@ func NewRouter(
caseSensitive: caseSensitive,
schemaEvoCfg: schemaEvoCfg,
commitCfg: commitCfg,
writerOpts: writerOpts,
resolver: newTypeResolver(schemaEvoCfg.SchemaMetadata, schemaEvoCfg.NewColumnTypeMapping, caseSensitive, logger),
logger: logger,
}
Expand Down Expand Up @@ -660,7 +664,7 @@ func (r *Router) createWriter(ctx context.Context, key tableKey) (*writer, error
}

// Create writer with its own table reference and the committer
w := NewWriter(writerTbl, comm, r.caseSensitive, r.logger)
w := NewWriter(writerTbl, comm, r.caseSensitive, r.writerOpts, r.logger)
r.logger.Debugf("Created writer for table %s.%s", key.namespace, key.table)

return w, nil
Expand Down
15 changes: 10 additions & 5 deletions internal/impl/iceberg/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ type writer struct {
table *table.Table
committer *committer
caseSensitive bool
writerOpts []parquet.WriterOption
logger *service.Logger
}

// NewWriter creates a new writer for a specific table.
// The table and committer should use separate table references since they
// operate in different goroutines and the table object is mutable.
// caseSensitive controls how message keys are matched against the schema.
func NewWriter(tbl *table.Table, comm *committer, caseSensitive bool, logger *service.Logger) *writer {
func NewWriter(tbl *table.Table, comm *committer, caseSensitive bool, writerOpts []parquet.WriterOption, logger *service.Logger) *writer {
return &writer{
table: tbl,
committer: comm,
caseSensitive: caseSensitive,
writerOpts: writerOpts,
logger: logger,
}
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func (w *writer) messagesToParquet(batch service.MessageBatch) ([]partitionFile,

// For unpartitioned tables, use a single writer
if spec.IsUnpartitioned() {
sink := newParquetSink(pqSchema, fieldToCol, w.caseSensitive)
sink := newParquetSink(pqSchema, fieldToCol, w.caseSensitive, w.writerOpts...)

for _, msg := range batch {
structured, err := msg.AsStructured()
Expand Down Expand Up @@ -268,7 +270,7 @@ func (w *writer) messagesToParquet(batch service.MessageBatch) ([]partitionFile,
} else {
entry = &partitionEntry{
key: partitionKey,
sink: newParquetSink(pqSchema, fieldToCol, w.caseSensitive),
sink: newParquetSink(pqSchema, fieldToCol, w.caseSensitive, w.writerOpts...),
}
// Insert at sorted position
partitions = slices.Insert(partitions, idx, entry)
Expand Down Expand Up @@ -323,9 +325,12 @@ type parquetSink struct {
seenFields map[string]struct{} // dedup by full path
}

func newParquetSink(pqSchema *parquet.Schema, fieldToCol map[int]int, caseSensitive bool) *parquetSink {
func newParquetSink(pqSchema *parquet.Schema, fieldToCol map[int]int, caseSensitive bool, writerOpts ...parquet.WriterOption) *parquetSink {
buf := bytes.NewBuffer(nil)
pw := parquet.NewGenericWriter[any](buf, pqSchema)
allOpts := make([]parquet.WriterOption, 0, 1+len(writerOpts))
allOpts = append(allOpts, pqSchema)
allOpts = append(allOpts, writerOpts...)
pw := parquet.NewGenericWriter[any](buf, allOpts...)
colWriters := pw.ColumnWriters()

columns := make(map[int]*parquetColumn, len(fieldToCol))
Expand Down
Loading