Skip to content

Commit

Permalink
Option to automatically create table if it doesn't exist (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Sep 29, 2023
1 parent 2e1f475 commit ef486b1
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 119 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.tables.defaultCommitBranch | Default branch for commits, main is used if not specified |
| iceberg.tables.cdcField | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsertModeEnabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.autoCreateEnabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolveSchemaEnabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.table.\<table name\>.idColumns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.routeRegex | The regex used to match a record's `routeField` to a table |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.List;
Expand All @@ -33,6 +34,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -112,7 +114,25 @@ public void testIcebergSinkSchemaEvolution(String branch) {
assertEquals(2, files.stream().mapToLong(DataFile::recordCount).sum());
assertSnapshotProps(TABLE_IDENTIFIER, branch);

// check the schema
assertGeneratedSchema();
}

@ParameterizedTest
@NullSource
@ValueSource(strings = {"test_branch"})
public void testIcebergSinkAutoCreate(String branch) {
runTest(branch, ImmutableMap.of("iceberg.tables.autoCreateEnabled", "true"));

List<DataFile> files = getDataFiles(TABLE_IDENTIFIER, branch);
// may involve 1 or 2 workers
assertThat(files).hasSizeBetween(1, 2);
assertEquals(2, files.stream().mapToLong(DataFile::recordCount).sum());
assertSnapshotProps(TABLE_IDENTIFIER, branch);

assertGeneratedSchema();
}

private void assertGeneratedSchema() {
Schema tableSchema = catalog.loadTable(TABLE_IDENTIFIER).schema();
assertThat(tableSchema.findField("id").type()).isInstanceOf(LongType.class);
assertThat(tableSchema.findField("type").type()).isInstanceOf(StringType.class);
Expand Down Expand Up @@ -166,7 +186,11 @@ private void runTest(String branch, Map<String, String> extraConfig) {
}

private void assertSnapshotAdded() {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
assertThat(table.snapshots()).hasSize(1);
try {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
assertThat(table.snapshots()).hasSize(1);
} catch (NoSuchTableException e) {
fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.defaultCommitBranch";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdcField";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP = "iceberg.tables.upsertModeEnabled";
private static final String TABLES_AUTO_CREATE_ENABLED_PROP = "iceberg.tables.autoCreateEnabled";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
"iceberg.tables.evolveSchemaEnabled";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
Expand Down Expand Up @@ -142,6 +143,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to treat all appends as upserts, false otherwise");
configDef.define(
TABLES_AUTO_CREATE_ENABLED_PROP,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to automatically create destination tables, false otherwise");
configDef.define(
TABLES_EVOLVE_SCHEMA_ENABLED_PROP,
Type.BOOLEAN,
Expand Down Expand Up @@ -348,6 +355,10 @@ public boolean isUpsertMode() {
return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP);
}

public boolean isAutoCreate() {
return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP);
}

public boolean isEvolveSchema() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import static java.util.stream.Collectors.toMap;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.IcebergWriter;
import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.Offset;
import io.tabular.iceberg.connect.data.RecordWriter;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterResult;
import io.tabular.iceberg.connect.events.CommitReadyPayload;
Expand All @@ -44,7 +44,6 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -60,8 +59,7 @@ public class Worker extends Channel {
private final IcebergWriterFactory writerFactory;
private final SinkTaskContext context;
private final String controlGroupId;
private final Map<String, IcebergWriter> writers;
private final Map<String, Boolean> tableExistsMap;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;

public Worker(
Expand All @@ -79,7 +77,6 @@ public Worker(
this.context = context;
this.controlGroupId = config.getControlGroupId();
this.writers = new HashMap<>();
this.tableExistsMap = new HashMap<>();
this.sourceOffsets = new HashMap<>();
}

Expand Down Expand Up @@ -116,7 +113,6 @@ protected boolean receive(Envelope envelope) {
writers.values().stream().flatMap(writer -> writer.complete().stream()).collect(toList());
Map<TopicPartition, Offset> offsets = new HashMap<>(sourceOffsets);

tableExistsMap.clear();
writers.clear();
sourceOffsets.clear();

Expand Down Expand Up @@ -169,7 +165,7 @@ protected boolean receive(Envelope envelope) {
@Override
public void stop() {
super.stop();
writers.values().forEach(IcebergWriter::close);
writers.values().forEach(RecordWriter::close);
}

public void save(Collection<SinkRecord> sinkRecords) {
Expand Down Expand Up @@ -199,7 +195,7 @@ private void routeRecordStatically(SinkRecord record) {
.getTables()
.forEach(
tableName -> {
getWriterForTable(tableName).write(record);
getWriterForTable(tableName, record, false).write(record);
});

} else {
Expand All @@ -215,7 +211,7 @@ private void routeRecordStatically(SinkRecord record) {
.ifPresent(
regex -> {
if (regex.matcher(routeValue).matches()) {
getWriterForTable(tableName).write(record);
getWriterForTable(tableName, record, false).write(record);
}
}));
}
Expand All @@ -229,17 +225,10 @@ private void routeRecordDynamically(SinkRecord record) {
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = routeValue.toLowerCase();
if (tableExists(tableName)) {
getWriterForTable(tableName).write(record);
}
getWriterForTable(tableName, record, true).write(record);
}
}

private boolean tableExists(String tableName) {
return tableExistsMap.computeIfAbsent(
tableName, notUsed -> catalog.tableExists(TableIdentifier.parse(tableName)));
}

private String extractRouteValue(Object recordValue, String routeField) {
if (recordValue == null) {
return null;
Expand All @@ -248,7 +237,9 @@ private String extractRouteValue(Object recordValue, String routeField) {
return routeValue == null ? null : routeValue.toString();
}

private IcebergWriter getWriterForTable(String tableName) {
return writers.computeIfAbsent(tableName, notUsed -> writerFactory.createWriter(tableName));
private RecordWriter getWriterForTable(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
return writers.computeIfAbsent(
tableName, notUsed -> writerFactory.createWriter(tableName, sample, ignoreMissingTable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
Expand All @@ -37,19 +35,17 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;

public class IcebergWriter implements Closeable {
public class IcebergWriter implements RecordWriter {
private final Table table;
private final String tableName;
private final TableIdentifier tableIdentifier;
private final IcebergSinkConfig config;
private final List<WriterResult> writerResults;

private RecordConverter recordConverter;
private TaskWriter<Record> writer;

public IcebergWriter(Catalog catalog, String tableName, IcebergSinkConfig config) {
this.tableIdentifier = TableIdentifier.parse(tableName);
this.table = catalog.loadTable(tableIdentifier);
public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
this.table = table;
this.tableName = tableName;
this.config = config;
this.writerResults = Lists.newArrayList();
Expand All @@ -61,6 +57,7 @@ private void initNewWriter() {
this.recordConverter = new RecordConverter(table, config.getJsonConverter());
}

@Override
public void write(SinkRecord record) {
try {
// TODO: config to handle tombstones instead of always ignoring?
Expand Down Expand Up @@ -139,12 +136,13 @@ private void flush() {

writerResults.add(
new WriterResult(
tableIdentifier,
TableIdentifier.parse(tableName),
Arrays.asList(writeResult.dataFiles()),
Arrays.asList(writeResult.deleteFiles()),
table.spec().partitionType()));
}

@Override
public List<WriterResult> complete() {
flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.connect.sink.SinkRecord;

public class IcebergWriterFactory {

Expand All @@ -31,7 +37,29 @@ public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
this.config = config;
}

public IcebergWriter createWriter(String tableName) {
return new IcebergWriter(catalog, tableName, config);
public RecordWriter createWriter(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
TableIdentifier identifier = TableIdentifier.parse(tableName);
Table table;
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
if (ignoreMissingTable) {
return new RecordWriter() {};
} else if (!config.isAutoCreate()) {
throw e;
}

StructType structType;
if (sample.valueSchema() == null) {
structType = SchemaUtils.inferIcebergType(sample.value()).asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema()).asStructType();
}

table = catalog.createTable(identifier, new Schema(structType.fields()));
}

return new IcebergWriter(table, tableName, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import java.util.Collections;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;

public interface RecordWriter extends Cloneable {

default void write(SinkRecord record) {}

default List<WriterResult> complete() {
return Collections.emptyList();
}

default void close() {}
}
Loading

0 comments on commit ef486b1

Please sign in to comment.