Skip to content

Commit 2cc2cde

Browse files
iceberg: add configurable Parquet string encoding for Redshift Spectrum compatibility (#4401)
* iceberg: add configurable Parquet string encoding for Redshift Spectrum compatibility
1 parent 4c25d02 commit 2cc2cde

9 files changed

Lines changed: 74 additions & 12 deletions

File tree

docs/modules/components/pages/outputs/iceberg.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ output:
153153
manifest_merge_enabled: true
154154
max_snapshot_age: 24h
155155
max_retries: 3
156+
parquet:
157+
string_encoding: delta_length_byte_array
156158
batching:
157159
count: 0
158160
byte_size: 0
@@ -925,6 +927,28 @@ Maximum number of times to retry a failed transaction commit.
925927
926928
*Default*: `3`
927929
930+
=== `parquet`
931+
932+
Parquet writer configuration.
933+
934+
935+
*Type*: `object`
936+
937+
938+
=== `parquet.string_encoding`
939+
940+
The encoding to use for string and binary columns. Use `plain` for compatibility with readers that do not support `DELTA_LENGTH_BYTE_ARRAY` encoding, such as AWS Redshift Spectrum.
941+
942+
943+
*Type*: `string`
944+
945+
*Default*: `"delta_length_byte_array"`
946+
947+
Options:
948+
`plain`
949+
, `delta_length_byte_array`
950+
.
951+
928952
=== `batching`
929953
930954
Allows you to configure a xref:configuration:batching.adoc[batching policy].

internal/impl/iceberg/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ const (
8585
// Performance fields
8686
ioFieldBatching = "batching"
8787
ioFieldMaxInFlight = "max_in_flight"
88+
89+
// Parquet writer fields
90+
ioFieldParquet = "parquet"
91+
ioFieldParquetStringEncoding = "string_encoding"
8892
)
8993

9094
// icebergOutputConfig returns the configuration spec for the Iceberg output.
@@ -347,6 +351,15 @@ array:list
347351
Advanced().
348352
Optional(),
349353

354+
// Parquet writer configuration
355+
service.NewObjectField(ioFieldParquet,
356+
service.NewStringEnumField(ioFieldParquetStringEncoding, "plain", "delta_length_byte_array").
357+
Description("The encoding to use for string and binary columns. Use `plain` for compatibility with readers that do not support `DELTA_LENGTH_BYTE_ARRAY` encoding, such as AWS Redshift Spectrum.").
358+
Default("delta_length_byte_array"),
359+
).Description("Parquet writer configuration.").
360+
Advanced().
361+
Optional(),
362+
350363
// Batching
351364
service.NewBatchPolicyField(ioFieldBatching),
352365
service.NewOutputMaxInFlightField().Default(4),

internal/impl/iceberg/e2e/glue/e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func newRouter(t *testing.T, namespace, table string, schemaEvo bool) *icebergim
7777
Enabled: schemaEvo,
7878
TableLocation: fmt.Sprintf("s3://%s/", *glueBucket),
7979
}
80-
router := icebergimpl.NewRouter(catalogConfig(), namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
80+
router := icebergimpl.NewRouter(catalogConfig(), namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
8181
t.Cleanup(func() { router.Close() })
8282
return router
8383
}

internal/impl/iceberg/e2e/polaris-aws/e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func newRouter(t *testing.T, catalogCfg catalogx.Config, namespace, tableName st
202202
schemaEvoCfg := icebergimpl.SchemaEvolutionConfig{
203203
Enabled: schemaEvo,
204204
}
205-
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
205+
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
206206
t.Cleanup(func() { router.Close() })
207207
return router
208208
}

internal/impl/iceberg/e2e/polaris-azure/e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func newRouter(t *testing.T, catalogCfg catalogx.Config, namespace, table string
192192
schemaEvoCfg := icebergimpl.SchemaEvolutionConfig{
193193
Enabled: schemaEvo,
194194
}
195-
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, logger)
195+
router := icebergimpl.NewRouter(catalogCfg, namespaceStr, tableStr, true, schemaEvoCfg, commitCfg, nil, logger)
196196
t.Cleanup(func() { router.Close() })
197197
return router
198198
}

internal/impl/iceberg/integration/test_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (infra *testInfrastructure) NewRouter(
138138
MaxSnapshotAge: 24 * time.Hour,
139139
MaxRetries: 3,
140140
}
141-
router := icebergimpl.NewRouter(infra.CatalogConfig(), namespaceStr, tableStr, o.caseSensitive, o.schemaEvoCfg, commitCfg, logger)
141+
router := icebergimpl.NewRouter(infra.CatalogConfig(), namespaceStr, tableStr, o.caseSensitive, o.schemaEvoCfg, commitCfg, nil, logger)
142142
t.Cleanup(func() { router.Close() })
143143
return router
144144
}

internal/impl/iceberg/output_iceberg.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/apache/iceberg-go"
1818
"github.com/apache/iceberg-go/io"
1919
_ "github.com/apache/iceberg-go/io/gocloud"
20+
"github.com/parquet-go/parquet-go"
2021

2122
"github.com/redpanda-data/benthos/v4/public/service"
2223

@@ -102,9 +103,24 @@ func newIcebergOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resourc
102103
return nil, fmt.Errorf("parsing commit config: %w", err)
103104
}
104105

105-
// Create router
106-
rtr := NewRouter(catalogCfg, namespaceStr, tableStr, caseSensitive, schemaEvoCfg, commitCfg, mgr.Logger())
106+
// Parse parquet config
107+
var writerOpts []parquet.WriterOption
108+
if conf.Contains(ioFieldParquet) {
109+
strEnc, err := conf.FieldString(ioFieldParquet, ioFieldParquetStringEncoding)
110+
if err != nil {
111+
return nil, fmt.Errorf("parsing %s: %w", ioFieldParquetStringEncoding, err)
112+
}
113+
switch strEnc {
114+
case "plain":
115+
writerOpts = append(writerOpts, parquet.DefaultEncodingFor(parquet.ByteArray, &parquet.Plain))
116+
case "delta_length_byte_array":
117+
// default - noop
118+
default:
119+
return nil, fmt.Errorf("unsupported %s value: %q, please consider raising an issue to request support for feature gap", ioFieldParquetStringEncoding, strEnc)
120+
}
121+
}
107122

123+
rtr := NewRouter(catalogCfg, namespaceStr, tableStr, caseSensitive, schemaEvoCfg, commitCfg, writerOpts, mgr.Logger())
108124
return &icebergOutput{
109125
router: rtr,
110126
logger: mgr.Logger(),

internal/impl/iceberg/router.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/apache/iceberg-go"
2121
"github.com/apache/iceberg-go/catalog"
2222
"github.com/apache/iceberg-go/table"
23+
"github.com/parquet-go/parquet-go"
2324

2425
"github.com/redpanda-data/benthos/v4/public/bloblang"
2526
"github.com/redpanda-data/benthos/v4/public/service"
@@ -71,6 +72,7 @@ type Router struct {
7172
caseSensitive bool
7273
schemaEvoCfg SchemaEvolutionConfig
7374
commitCfg CommitConfig
75+
writerOpts []parquet.WriterOption
7476
resolver *typeResolver
7577

7678
entries sync.Map // tableKey -> *tableEntry
@@ -89,6 +91,7 @@ func NewRouter(
8991
caseSensitive bool,
9092
schemaEvoCfg SchemaEvolutionConfig,
9193
commitCfg CommitConfig,
94+
writerOpts []parquet.WriterOption,
9295
logger *service.Logger,
9396
) *Router {
9497
return &Router{
@@ -98,6 +101,7 @@ func NewRouter(
98101
caseSensitive: caseSensitive,
99102
schemaEvoCfg: schemaEvoCfg,
100103
commitCfg: commitCfg,
104+
writerOpts: writerOpts,
101105
resolver: newTypeResolver(schemaEvoCfg.SchemaMetadata, schemaEvoCfg.NewColumnTypeMapping, caseSensitive, logger),
102106
logger: logger,
103107
}
@@ -660,7 +664,7 @@ func (r *Router) createWriter(ctx context.Context, key tableKey) (*writer, error
660664
}
661665

662666
// Create writer with its own table reference and the committer
663-
w := NewWriter(writerTbl, comm, r.caseSensitive, r.logger)
667+
w := NewWriter(writerTbl, comm, r.caseSensitive, r.writerOpts, r.logger)
664668
r.logger.Debugf("Created writer for table %s.%s", key.namespace, key.table)
665669

666670
return w, nil

internal/impl/iceberg/writer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,20 @@ type writer struct {
3333
table *table.Table
3434
committer *committer
3535
caseSensitive bool
36+
writerOpts []parquet.WriterOption
3637
logger *service.Logger
3738
}
3839

3940
// NewWriter creates a new writer for a specific table.
4041
// The table and committer should use separate table references since they
4142
// operate in different goroutines and the table object is mutable.
4243
// caseSensitive controls how message keys are matched against the schema.
43-
func NewWriter(tbl *table.Table, comm *committer, caseSensitive bool, logger *service.Logger) *writer {
44+
func NewWriter(tbl *table.Table, comm *committer, caseSensitive bool, writerOpts []parquet.WriterOption, logger *service.Logger) *writer {
4445
return &writer{
4546
table: tbl,
4647
committer: comm,
4748
caseSensitive: caseSensitive,
49+
writerOpts: writerOpts,
4850
logger: logger,
4951
}
5052
}
@@ -189,7 +191,7 @@ func (w *writer) messagesToParquet(batch service.MessageBatch) ([]partitionFile,
189191

190192
// For unpartitioned tables, use a single writer
191193
if spec.IsUnpartitioned() {
192-
sink := newParquetSink(pqSchema, fieldToCol, w.caseSensitive)
194+
sink := newParquetSink(pqSchema, fieldToCol, w.caseSensitive, w.writerOpts...)
193195

194196
for _, msg := range batch {
195197
structured, err := msg.AsStructured()
@@ -268,7 +270,7 @@ func (w *writer) messagesToParquet(batch service.MessageBatch) ([]partitionFile,
268270
} else {
269271
entry = &partitionEntry{
270272
key: partitionKey,
271-
sink: newParquetSink(pqSchema, fieldToCol, w.caseSensitive),
273+
sink: newParquetSink(pqSchema, fieldToCol, w.caseSensitive, w.writerOpts...),
272274
}
273275
// Insert at sorted position
274276
partitions = slices.Insert(partitions, idx, entry)
@@ -323,9 +325,12 @@ type parquetSink struct {
323325
seenFields map[string]struct{} // dedup by full path
324326
}
325327

326-
func newParquetSink(pqSchema *parquet.Schema, fieldToCol map[int]int, caseSensitive bool) *parquetSink {
328+
func newParquetSink(pqSchema *parquet.Schema, fieldToCol map[int]int, caseSensitive bool, writerOpts ...parquet.WriterOption) *parquetSink {
327329
buf := bytes.NewBuffer(nil)
328-
pw := parquet.NewGenericWriter[any](buf, pqSchema)
330+
allOpts := make([]parquet.WriterOption, 0, 1+len(writerOpts))
331+
allOpts = append(allOpts, pqSchema)
332+
allOpts = append(allOpts, writerOpts...)
333+
pw := parquet.NewGenericWriter[any](buf, allOpts...)
329334
colWriters := pw.ColumnWriters()
330335

331336
columns := make(map[int]*parquetColumn, len(fieldToCol))

0 commit comments

Comments
 (0)