From 73e55f4422fa5219c04b2c3b6b36aef09788afc9 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sun, 9 Jul 2023 21:00:16 -0700 Subject: [PATCH] Support catalog type property and setting catalog name (#41) --- README.md | 13 +++++++------ build.gradle | 2 +- .../iceberg/connect/IntegrationCdcTest.java | 6 ++++-- .../connect/IntegrationDynamicTableTest.java | 6 ++++-- .../connect/IntegrationMultiTableTest.java | 6 ++++-- .../tabular/iceberg/connect/IntegrationTest.java | 6 ++++-- .../iceberg/connect/IcebergSinkConfig.java | 15 +++++++++++---- .../tabular/iceberg/connect/data/Utilities.java | 4 ++-- .../iceberg/connect/IcebergSinkConfigTest.java | 5 ++--- .../iceberg/connect/IcebergSinkTaskTest.java | 2 +- 10 files changed, 40 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 1066347e..b90aefc3 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions | iceberg.control.commitIntervalMs | Commit interval in msec, default is 300,000 (5 min) | | iceberg.control.commitTimeoutMs | Commit timeout interval in msec, default is 30,000 (30 sec) | | iceberg.control.commitThreads | Number of threads to use for commits, default is (cores * 2) | +| iceberg.catalog | Name of the catalog, default is `iceberg` | | iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | | iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | @@ -59,7 +60,7 @@ otherwise you will need to include that yourself. ### REST example ``` -"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog", +"iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://catalog-service", "iceberg.catalog.credential": "", "iceberg.catalog.warehouse": "", @@ -69,7 +70,7 @@ otherwise you will need to include that yourself. NOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use `S3FileIO` when using S3 for storage (the default is `HadoopFileIO` with `HiveCatalog`). ``` -"iceberg.catalog":"org.apache.iceberg.hive.HiveCatalog", +"iceberg.catalog.tyoe":"hive", "iceberg.catalog.uri":"thrift://hive:9083", "iceberg.catalog.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.warehouse":"s3a://bucket/warehouse", @@ -131,7 +132,7 @@ This example config connects to a Iceberg REST catalog. "tasks.max": "2", "topics": "events", "iceberg.tables": "default.events", - "iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog", + "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "", "iceberg.catalog.warehouse": "" @@ -173,7 +174,7 @@ PARTITIONED BY (hours(ts)); "iceberg.tables.routeField": "type", "iceberg.table.default.events_list.routeRegex": "list", "iceberg.table.default.events_create.routeRegex": "create", - "iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog", + "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "", "iceberg.catalog.warehouse": "" @@ -199,7 +200,7 @@ See above for creating two tables. "topics": "events", "iceberg.tables.dynamic.enabled": "true", "iceberg.tables.routeField": "db_table", - "iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog", + "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "", "iceberg.catalog.warehouse": "" @@ -226,7 +227,7 @@ See above for creating the table "topics": "events", "iceberg.tables": "default.events", "iceberg.tables.cdcField": "_cdc_op", - "iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog", + "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "", "iceberg.catalog.warehouse": "" diff --git a/build.gradle b/build.gradle index 0877a797..fbb6316b 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ subprojects { apply plugin: "maven-publish" group "io.tabular.connect" - version "0.4.3-SNAPSHOT" + version "0.4.3" repositories { mavenCentral() diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java index 88f8fcfa..4afe9f7e 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; @@ -44,7 +45,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -105,7 +105,9 @@ public void testIcebergSink() throws Exception { .config("iceberg.tables.cdcField", "op") .config("iceberg.control.commitIntervalMs", 1000) .config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE) - .config("iceberg.catalog", RESTCatalog.class.getName()) + .config( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) .config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181") .config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000") .config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java index 91be8513..b9661ed8 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -38,7 +39,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -99,7 +99,9 @@ public void testIcebergSink() { .config("iceberg.tables.routeField", "payload") .config("iceberg.control.commitIntervalMs", 1000) .config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE) - .config("iceberg.catalog", RESTCatalog.class.getName()) + .config( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) .config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181") .config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000") .config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java index 5b3eeeb5..f074e1ba 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -38,7 +39,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -103,7 +103,9 @@ public void testIcebergSink() { .config(format("iceberg.table.%s.%s.routeRegex", TEST_DB, TEST_TABLE2), "type2") .config("iceberg.control.commitIntervalMs", 1000) .config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE) - .config("iceberg.catalog", RESTCatalog.class.getName()) + .config( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) .config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181") .config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000") .config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java index 926c0e4f..7bf48a34 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -39,7 +40,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -96,7 +96,9 @@ public void testIcebergSink() throws Exception { .config("iceberg.tables", format("%s.%s", TEST_DB, TEST_TABLE)) .config("iceberg.control.commitIntervalMs", 1000) .config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE) - .config("iceberg.catalog", RESTCatalog.class.getName()) + .config( + "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST) .config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181") .config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000") .config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java index b08708aa..0b7f6378 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java @@ -58,7 +58,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; private static final String TABLE_PROP_PREFIX = "iceberg.table."; - private static final String CATALOG_IMPL_PROP = "iceberg.catalog"; + private static final String CATALOG_NAME_PROP = "iceberg.catalog"; private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic.enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.routeField"; @@ -75,6 +75,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String NAME_PROP = "name"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -125,7 +126,12 @@ private static ConfigDef newConfigDef() { false, Importance.MEDIUM, "Set to true to treat all appends as upserts, false otherwise"); - configDef.define(CATALOG_IMPL_PROP, Type.STRING, Importance.HIGH, "Iceberg catalog class name"); + configDef.define( + CATALOG_NAME_PROP, + Type.STRING, + DEFAULT_CATALOG_NAME, + Importance.MEDIUM, + "Iceberg catalog name"); configDef.define( CONTROL_TOPIC_PROP, Type.STRING, @@ -186,6 +192,7 @@ public IcebergSinkConfig(Map originalProps) { } private void validate() { + checkState(!getCatalogProps().isEmpty(), "Must specify Iceberg catalog properties"); if (getTables() != null) { checkState(!getDynamicTablesEnabled(), "Cannot specify both static and dynamic table names"); } else if (getDynamicTablesEnabled()) { @@ -223,8 +230,8 @@ public Map getKafkaProps() { return kafkaProps; } - public String getCatalogImpl() { - return getString(CATALOG_IMPL_PROP); + public String getCatalogName() { + return getString(CATALOG_NAME_PROP); } public List getTables() { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java index bd8b4cea..856a099d 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java @@ -48,8 +48,8 @@ public class Utilities { private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); public static Catalog loadCatalog(IcebergSinkConfig config) { - return CatalogUtil.loadCatalog( - config.getCatalogImpl(), "iceberg", config.getCatalogProps(), getHadoopConfig()); + return CatalogUtil.buildIcebergCatalog( + config.getCatalogName(), config.getCatalogProps(), getHadoopConfig()); } private static Object getHadoopConfig() { diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkConfigTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkConfigTest.java index 2fa8d53f..fcebd6c9 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkConfigTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkConfigTest.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; @@ -47,7 +46,7 @@ public void testInvalid() { Map props = ImmutableMap.of( "topics", "source-topic", - "iceberg.catalog", RESTCatalog.class.getName(), + "iceberg.catalog.type", "rest", "iceberg.tables", "db.landing", "iceberg.tables.dynamic.enabled", "true"); assertThatExceptionOfType(ConfigException.class).isThrownBy(() -> new IcebergSinkConfig(props)); @@ -57,7 +56,7 @@ public void testInvalid() { public void testGetDefault() { Map props = ImmutableMap.of( - "iceberg.catalog", RESTCatalog.class.getName(), + "iceberg.catalog.type", "rest", "topics", "source-topic", "iceberg.tables", "db.landing"); IcebergSinkConfig config = new IcebergSinkConfig(props); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java index 6e9a7db2..00b04a0a 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java @@ -35,7 +35,7 @@ public void testIsLeader() { task.start( ImmutableMap.of( "topics", "topic1, topic2", - "iceberg.catalog", "catalog", + "iceberg.catalog.type", "rest", "iceberg.tables", "table")); List assignments =