Skip to content

Commit

Permalink
Add config for setting partition spec on auto table create (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Oct 8, 2023
1 parent 5812322 commit 0f2eb9f
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 49 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition fields to use when creating tables |
| iceberg.tables.cdc-field | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsert-mode-enabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ configurations {
dependencies {
implementation project(":iceberg-kafka-connect")
implementation project(":iceberg-kafka-connect-transforms")
implementation libs.bundles.iceberg.ext
implementation(libs.hadoop.common) {
exclude group: "ch.qos.reload4j"
exclude group: "com.google.guava"
exclude group: "log4j"
exclude group: "org.slf4j"
}
runtimeOnly libs.bundles.iceberg.ext

hive libs.iceberg.hive.metastore
hive(libs.hive.metastore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -37,6 +38,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.LongType;
Expand Down Expand Up @@ -134,7 +136,15 @@ public void testIcebergSinkSchemaEvolution(String branch) {
@ValueSource(strings = "test_branch")
public void testIcebergSinkAutoCreate(String branch) {
boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema, ImmutableMap.of("iceberg.tables.auto-create-enabled", "true"));

Map<String, String> extraConfig = Maps.newHashMap();
extraConfig.put("iceberg.tables.auto-create-enabled", "true");
if (useSchema) {
// partition the table for one of the tests
extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)");
}

runTest(branch, useSchema, extraConfig);

List<DataFile> files = dataFiles(TABLE_IDENTIFIER, branch);
// may involve 1 or 2 workers
Expand All @@ -143,6 +153,9 @@ public void testIcebergSinkAutoCreate(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER, branch);

assertGeneratedSchema(useSchema, LongType.class);

PartitionSpec spec = catalog.loadTable(TABLE_IDENTIFIER).spec();
assertThat(spec.isPartitioned()).isEqualTo(useSchema);
}

private void assertGeneratedSchema(boolean useSchema, Class<? extends Type> expectedIdType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.api.client.util.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
Expand All @@ -35,6 +34,7 @@
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.awaitility.Awaitility;
import org.testcontainers.containers.GenericContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
import java.util.stream.Stream;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.RESTCatalog;
Expand Down Expand Up @@ -92,7 +89,7 @@ private TestContext() {
.dependsOn(minio)
.withExposedPorts(CATALOG_PORT)
.withEnv("CATALOG_WAREHOUSE", "s3://" + BUCKET + "/warehouse")
.withEnv("CATALOG_IO__IMPL", S3FileIO.class.getName())
.withEnv("CATALOG_IO__IMPL", "org.apache.iceberg.aws.s3.S3FileIO")
.withEnv("CATALOG_S3_ENDPOINT", "http://minio:9000")
.withEnv("CATALOG_S3_ACCESS__KEY__ID", AWS_ACCESS_KEY)
.withEnv("CATALOG_S3_SECRET__ACCESS__KEY", AWS_SECRET_KEY)
Expand Down Expand Up @@ -170,12 +167,12 @@ public Catalog initLocalCatalog() {
"local",
ImmutableMap.<String, String>builder()
.put(CatalogProperties.URI, localCatalogUri)
.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName())
.put(S3FileIOProperties.ENDPOINT, "http://localhost:" + getLocalMinioPort())
.put(S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
.put(S3FileIOProperties.SECRET_ACCESS_KEY, AWS_SECRET_KEY)
.put(S3FileIOProperties.PATH_STYLE_ACCESS, "true")
.put(AwsClientProperties.CLIENT_REGION, AWS_REGION)
.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO")
.put("s3.endpoint", "http://localhost:" + getLocalMinioPort())
.put("s3.access-key-id", AWS_ACCESS_KEY)
.put("s3.secret-access-key", AWS_SECRET_KEY)
.put("s3.path-style-access", "true")
.put("client.region", AWS_REGION)
.build());
return result;
}
Expand All @@ -186,11 +183,14 @@ public Map<String, Object> connectorCatalogProperties() {
"iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
.put("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:" + CATALOG_PORT)
.put("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:" + MINIO_PORT)
.put("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
.put("iceberg.catalog." + S3FileIOProperties.SECRET_ACCESS_KEY, AWS_SECRET_KEY)
.put("iceberg.catalog." + S3FileIOProperties.PATH_STYLE_ACCESS, true)
.put("iceberg.catalog." + AwsClientProperties.CLIENT_REGION, AWS_REGION)
.put(
"iceberg.catalog." + CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.aws.s3.S3FileIO")
.put("iceberg.catalog.s3.endpoint", "http://minio:" + MINIO_PORT)
.put("iceberg.catalog.s3.access-key-id", AWS_ACCESS_KEY)
.put("iceberg.catalog.s3.secret-access-key", AWS_SECRET_KEY)
.put("iceberg.catalog.s3.path-style-access", true)
.put("iceberg.catalog.client.region", AWS_REGION)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.coordinator.transactional.suffix";
private static final String ROUTE_REGEX = "route-regex";
private static final String ID_COLUMNS = "id-columns";
private static final String PARTITION_BY = "partition-by";
private static final String COMMIT_BRANCH = "commit-branch";

private static final String CATALOG_PROP_PREFIX = "iceberg.catalog.";
Expand All @@ -69,6 +70,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP =
"iceberg.tables.upsert-mode-enabled";
Expand Down Expand Up @@ -133,6 +136,18 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Default branch for commits");
configDef.define(
TABLES_DEFAULT_ID_COLUMNS,
Type.STRING,
null,
Importance.MEDIUM,
"Default ID columns for tables, comma-separated");
configDef.define(
TABLES_DEFAULT_PARTITION_BY,
Type.STRING,
null,
Importance.MEDIUM,
"Default partition spec to use when creating tables, comma-separated");
configDef.define(
TABLES_CDC_FIELD_PROP,
Type.STRING,
Expand Down Expand Up @@ -293,30 +308,46 @@ public String tablesDefaultCommitBranch() {
return getString(TABLES_DEFAULT_COMMIT_BRANCH);
}

public String tablesDefaultIdColumns() {
return getString(TABLES_DEFAULT_ID_COLUMNS);
}

public String tablesDefaultPartitionBy() {
return getString(TABLES_DEFAULT_PARTITION_BY);
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigMap.computeIfAbsent(
tableName,
notUsed -> {
Map<String, String> tableProps =
Map<String, String> tableConfig =
PropertyUtil.propertiesWithPrefix(originalProps, TABLE_PROP_PREFIX + tableName + ".");
String routeRegexStr = tableProps.get(ROUTE_REGEX);

String routeRegexStr = tableConfig.get(ROUTE_REGEX);
Pattern routeRegex = routeRegexStr == null ? null : Pattern.compile(routeRegexStr);

String idColumnsStr = tableProps.get(ID_COLUMNS);
List<String> idColumns =
idColumnsStr == null || idColumnsStr.isEmpty()
? ImmutableList.of()
: Arrays.stream(idColumnsStr.split(",")).map(String::trim).collect(toList());
String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, tablesDefaultIdColumns());
List<String> idColumns = stringToList(idColumnsStr);

String commitBranch = tableProps.get(COMMIT_BRANCH);
if (commitBranch == null) {
commitBranch = tablesDefaultCommitBranch();
}
String partitionByStr =
tableConfig.getOrDefault(PARTITION_BY, tablesDefaultPartitionBy());
List<String> partitionBy = stringToList(partitionByStr);

String commitBranch =
tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch());

return new TableSinkConfig(routeRegex, idColumns, commitBranch);
return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch);
});
}

private List<String> stringToList(String value) {
if (value == null || value.isEmpty()) {
return ImmutableList.of();
}

return Arrays.stream(value.split(",")).map(String::trim).collect(toList());
}

public String tablesCdcField() {
return getString(TABLES_CDC_FIELD_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ public class TableSinkConfig {

private final Pattern routeRegex;
private final List<String> idColumns;
private final List<String> partitionBy;
private final String commitBranch;

public TableSinkConfig(Pattern routeRegex, List<String> idColumns, String commitBranch) {
public TableSinkConfig(
Pattern routeRegex, List<String> idColumns, List<String> partitionBy, String commitBranch) {
this.routeRegex = routeRegex;
this.idColumns = idColumns;
this.partitionBy = partitionBy;
this.commitBranch = commitBranch;
}

Expand All @@ -42,6 +45,10 @@ public List<String> idColumns() {
return idColumns;
}

public List<String> partitionBy() {
return partitionBy;
}

public Optional<String> commitBranch() {
return Optional.ofNullable(commitBranch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.Schema;
import java.util.List;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergWriterFactory {

private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class);

private final Catalog catalog;
private final IcebergSinkConfig config;

Expand All @@ -43,23 +49,48 @@ public RecordWriter createWriter(
Table table;
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
} catch (NoSuchTableException nst) {
if (ignoreMissingTable) {
return new RecordWriter() {};
} else if (!config.autoCreateEnabled()) {
throw e;
}

StructType structType;
if (sample.valueSchema() == null) {
structType = SchemaUtils.inferIcebergType(sample.value()).asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema()).asStructType();
throw nst;
}

table = catalog.createTable(identifier, new Schema(structType.fields()));
table = autoCreateTable(tableName, sample);
}

return new IcebergWriter(table, tableName, config);
}

private Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType = SchemaUtils.inferIcebergType(sample.value()).asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema()).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);

List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
}

try {
return catalog.createTable(identifier, schema, spec);
} catch (AlreadyExistsException e) {
LOG.info("Table {} was already created", identifier);
return catalog.loadTable(identifier);
}
}
}
Loading

0 comments on commit 0f2eb9f

Please sign in to comment.