Skip to content

Commit

Permalink
Update to Iceberg 1.4.0-rc.2 (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Oct 4, 2023
1 parent c17c6f7 commit 03dcf40
Show file tree
Hide file tree
Showing 54 changed files with 488 additions and 484 deletions.
1 change: 0 additions & 1 deletion .java-version

This file was deleted.

76 changes: 38 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,33 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions

# Configuration

| Property | Description |
|-------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic.enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.routeField | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.defaultCommitBranch | Default branch for commits, main is used if not specified |
| iceberg.tables.cdcField | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsertModeEnabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.autoCreateEnabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolveSchemaEnabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.table.\<table name\>.idColumns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.routeRegex | The regex used to match a record's `routeField` to a table |
| iceberg.table.\<table name\>.commitBranch | Table-specific branch for commits, use `iceberg.tables.defaultCommitBranch` if not specified |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group.id | Name of the consumer group to store offsets, default is `cg-control-<connector name>` |
| iceberg.control.commitIntervalMs | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commitTimeoutMs | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commitThreads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic.enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic.enabled` is `true` then you must specify `iceberg.tables.routeField` which will
contain the name of the table. Enabling `iceberg.tables.upsertModeEnabled` will cause all appends to be
| Property | Description |
|--------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.cdc-field | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsert-mode-enabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id | Name of the consumer group to store offsets, default is `cg-control-<connector name>` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
contain the name of the table. Enabling `iceberg.tables.upsert-mode-enabled` will cause all appends to be
preceded by an equality delete. Both CDC and upsert mode require an Iceberg V2 table with identity fields
defined.

Expand All @@ -59,7 +59,7 @@ can be set explicitly using `iceberg.kafka.*` properties.
### Source topic offsets

Source topic offsets are stored in two different consumer groups. The first is the sink-managed consumer
group defined by the `iceberg.control.group.id` property. The second is the Kafka Connect managed
group defined by the `iceberg.control.group-id` property. The second is the Kafka Connect managed
consumer group which is named `connect-<connector name>` by default. The sink-managed consumer
group is used by the sink to achieve exactly-once processing. The Kafka Connect consumer group is
only used as a fallback if the sink-managed consumer group is missing. To reset the offsets,
Expand Down Expand Up @@ -215,9 +215,9 @@ PARTITIONED BY (hours(ts));
"tasks.max": "2",
"topics": "events",
"iceberg.tables": "default.events_list,default.events_create",
"iceberg.tables.routeField": "type",
"iceberg.table.default.events_list.routeRegex": "list",
"iceberg.table.default.events_create.routeRegex": "create",
"iceberg.tables.route-field": "type",
"iceberg.table.default.events_list.route-regex": "list",
"iceberg.table.default.events_create.route-regex": "create",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
Expand All @@ -242,8 +242,8 @@ See above for creating two tables.
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events",
"iceberg.tables.dynamic.enabled": "true",
"iceberg.tables.routeField": "db_table",
"iceberg.tables.dynamic-enabled": "true",
"iceberg.tables.route-field": "db_table",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
Expand All @@ -257,7 +257,7 @@ This example applies inserts, updates, and deletes based on the value of a field
For example, if the `_cdc_op` field is set to `I` then the record is inserted, if `U` then it is
upserted, and if `D` then it is deleted. This requires that the table be in Iceberg v2 format.
The Iceberg identifier field(s) are used to identify a row, if that is not set for the table,
then the `iceberg.tables.idColumns`configuration can be set instead. CDC can be combined with
then the `iceberg.tables.id-columns`configuration can be set instead. CDC can be combined with
multi-table fan-out.

### Create the destination table
Expand All @@ -272,7 +272,7 @@ See above for creating the table
"tasks.max": "2",
"topics": "events",
"iceberg.tables": "default.events",
"iceberg.tables.cdcField": "_cdc_op",
"iceberg.tables.cdc-field": "_cdc_op",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
Expand Down Expand Up @@ -304,9 +304,9 @@ Here is an example config that uses this transform to apply updates to an Iceber
"transforms": "dms",
"transforms.dms.type": "io.tabular.iceberg.connect.transforms.DmsTransform",
"iceberg.tables": "default.dms_test",
"iceberg.tables.cdcField": "_cdc_op",
"iceberg.tables.routeField": "_cdc_table",
"iceberg.table.default.dms_test.routeRegex": "src_db.src_table",
"iceberg.tables.cdc-field": "_cdc_op",
"iceberg.tables.route-field": "_cdc_table",
"iceberg.table.default.dms_test.route-regex": "src_db.src_table",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
Expand Down
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ subprojects {

repositories {
mavenCentral()

maven {
url = "https://tabular-repository-public.s3.amazonaws.com/releases"
}
// FIXME!! remove when Iceberg 1.4 is released...
maven {
url = "https://repository.apache.org/content/repositories/orgapacheiceberg-1146"
}
}

sourceCompatibility = "1.8"
Expand Down
8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
[versions]
assertj-ver = "3.23.1"
assertj-ver = "3.24.2"
avro-ver = "1.11.1"
awaitility-ver = "4.2.0"
hadoop-ver = "3.3.6"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
iceberg-ver = "1.3.1-tabular.31"
iceberg-ver = "1.4.0"
jackson-ver = "2.14.2"
junit-ver = "5.9.2"
junit-ver = "5.10.0"
kafka-ver = "3.5.1"
slf4j-ver = "1.7.36"
testcontainers-ver = "1.18.1"
testcontainers-ver = "1.17.6"


[libraries]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CommitCompletePayload implements Payload {

private UUID commitId;
private Long vtts;
private Schema avroSchema;
private final Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
Expand All @@ -44,6 +44,7 @@ public class CommitCompletePayload implements Payload {
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public CommitCompletePayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand All @@ -54,11 +55,11 @@ public CommitCompletePayload(UUID commitId, Long vtts) {
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
public UUID commitId() {
return commitId;
}

public Long getVtts() {
public Long vtts() {
return vtts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CommitReadyPayload implements Payload {

private UUID commitId;
private List<TopicPartitionOffset> assignments;
private Schema avroSchema;
private final Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
Expand All @@ -46,6 +46,7 @@ public class CommitReadyPayload implements Payload {
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public CommitReadyPayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand All @@ -56,11 +57,11 @@ public CommitReadyPayload(UUID commitId, List<TopicPartitionOffset> assignments)
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
public UUID commitId() {
return commitId;
}

public List<TopicPartitionOffset> getAssignments() {
public List<TopicPartitionOffset> assignments() {
return assignments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class CommitRequestPayload implements Payload {

private UUID commitId;
private Schema avroSchema;
private final Schema avroSchema;

public static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
Expand All @@ -37,6 +37,7 @@ public class CommitRequestPayload implements Payload {
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public CommitRequestPayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand All @@ -46,7 +47,7 @@ public CommitRequestPayload(UUID commitId) {
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
public UUID commitId() {
return commitId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class CommitResponsePayload implements Payload {
private TableName tableName;
private List<DataFile> dataFiles;
private List<DeleteFile> deleteFiles;
private Schema avroSchema;
private final Schema avroSchema;

// Used by Avro reflection to instantiate this class when reading events
public CommitResponsePayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand Down Expand Up @@ -93,19 +94,19 @@ public CommitResponsePayload(
.endRecord();
}

public UUID getCommitId() {
public UUID commitId() {
return commitId;
}

public TableName getTableName() {
public TableName tableName() {
return tableName;
}

public List<DataFile> getDataFiles() {
public List<DataFile> dataFiles() {
return dataFiles;
}

public List<DeleteFile> getDeleteFiles() {
public List<DeleteFile> deleteFiles() {
return deleteFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CommitTablePayload implements Payload {
private TableName tableName;
private Long snapshotId;
private Long vtts;
private Schema avroSchema;
private final Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
Expand Down Expand Up @@ -56,6 +56,7 @@ public class CommitTablePayload implements Payload {
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public CommitTablePayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand All @@ -68,19 +69,19 @@ public CommitTablePayload(UUID commitId, TableName tableName, Long snapshotId, L
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
public UUID commitId() {
return commitId;
}

public TableName getTableName() {
public TableName tableName() {
return tableName;
}

public Long getSnapshotId() {
public Long snapshotId() {
return snapshotId;
}

public Long getVtts() {
public Long vtts() {
return vtts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class Event implements Element {
private Long timestamp;
private String groupId;
private Payload payload;
private Schema avroSchema;
private final Schema avroSchema;

private static final ThreadLocal<Map<?, ?>> DECODER_CACHES = getDecoderCaches();
private static final ThreadLocal<Map<?, ?>> DECODER_CACHES = decoderCaches();

public static byte[] encode(Event event) {
try {
Expand All @@ -58,6 +58,7 @@ public static Event decode(byte[] bytes) {
}
}

// Used by Avro reflection to instantiate this class when reading events
public Event(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand Down Expand Up @@ -99,23 +100,23 @@ public Event(String groupId, EventType type, Payload payload) {
.endRecord();
}

public UUID getId() {
public UUID id() {
return id;
}

public EventType getType() {
public EventType type() {
return type;
}

public Long getTimestamp() {
public Long timestamp() {
return timestamp;
}

public Payload getPayload() {
public Payload payload() {
return payload;
}

public String getGroupId() {
public String groupId() {
return groupId;
}

Expand Down Expand Up @@ -153,7 +154,7 @@ public Object get(int i) {
case 0:
return id;
case 1:
return type == null ? null : type.getId();
return type == null ? null : type.id();
case 2:
return timestamp;
case 3:
Expand All @@ -166,7 +167,7 @@ public Object get(int i) {
}

@SuppressWarnings("unchecked")
private static ThreadLocal<Map<?, ?>> getDecoderCaches() {
private static ThreadLocal<Map<?, ?>> decoderCaches() {
return (ThreadLocal<Map<?, ?>>)
DynFields.builder().hiddenImpl(DecoderResolver.class, "DECODER_CACHES").buildStatic().get();
}
Expand Down
Loading

0 comments on commit 03dcf40

Please sign in to comment.