Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
maven_verify:
strategy:
matrix:
java-version: [ 8, 11 ]
java-version: [ 11, 17 ]
runs-on: [ ubuntu-latest ]
runs-on: ${{ matrix.runs-on }}
steps:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ buildNumber.properties
.idea/
*.iml

# Mac OS
.DS_Store
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setUp() throws Exception {
"heartbeat.timeout: 60000",
"parallelism.default: 1"));
jobManager =
new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
new GenericContainer<>(new DockerImageName("flink:2.1-scala_2.12"))
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withExtraHost("host.docker.internal", "host-gateway")
Expand All @@ -101,7 +101,7 @@ public void setUp() throws Exception {
.withEnv("FLINK_PROPERTIES", properties)
.withLogConsumer(new Slf4jLogConsumer(logger));
taskManager =
new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
new GenericContainer<>(new DockerImageName("flink:2.1-scala_2.12"))
.withCommand("taskmanager")
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.flink.connector.clickhouse;

import org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat;
import org.apache.flink.connector.clickhouse.internal.ClickHouseRowDataSinkFunction;
import org.apache.flink.connector.clickhouse.internal.ClickHouseRowDataSink;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -99,8 +99,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withPrimaryKey(primaryKeys)
.withPartitionKey(partitionKeys)
.build();
return SinkFunctionProvider.of(
new ClickHouseRowDataSinkFunction(outputFormat), options.getParallelism());

return SinkV2Provider.of(new ClickHouseRowDataSink(outputFormat));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableFactory;
import org.apache.flink.connector.clickhouse.internal.schema.DistributedEngineFull;
import org.apache.flink.connector.clickhouse.util.DataTypeUtil;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
Expand Down Expand Up @@ -296,14 +296,14 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
e);
}

return new CatalogTableImpl(
createTableSchema(databaseName, tableName),
getPartitionKeys(databaseName, tableName),
configuration,
"");
return CatalogTable.newBuilder()
.schema(createTableSchema(databaseName, tableName))
.partitionKeys(getPartitionKeys(databaseName, tableName))
.options(configuration)
.build();
}

private synchronized TableSchema createTableSchema(String databaseName, String tableName) {
private synchronized Schema createTableSchema(String databaseName, String tableName) {
// 1.Maybe has compatibility problems with the different version of clickhouse jdbc. 2. Is
// it more appropriate to use type literals from `system.columns` to convert Flink data
// types? 3. All queried data will be obtained before PreparedStatement is closed, so we
Expand All @@ -321,15 +321,15 @@ private synchronized TableSchema createTableSchema(String databaseName, String t
getColMethod.setAccessible(true);

List<String> primaryKeys = getPrimaryKeys(databaseName, tableName);
TableSchema.Builder builder = TableSchema.builder();
Schema.Builder builder = Schema.newBuilder();
for (int idx = 1; idx <= metaData.getColumnCount(); idx++) {
ClickHouseColumn columnInfo = (ClickHouseColumn) getColMethod.invoke(metaData, idx);
String columnName = columnInfo.getColumnName();
DataType columnType = DataTypeUtil.toFlinkType(columnInfo);
if (primaryKeys.contains(columnName)) {
columnType = columnType.notNull();
}
builder.field(columnName, columnType);
builder.column(columnName, columnType);
}

if (!primaryKeys.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public abstract class AbstractClickHouseOutputFormat extends RichOutputFormat<Ro

public AbstractClickHouseOutputFormat() {}

protected abstract void open() throws IOException;

@Override
public void configure(Configuration parameters) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected ClickHouseBatchOutputFormat(
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
protected void open() throws IOException {
try {
// TODO Distributed tables don't support update and delete statements.
executor =
Expand All @@ -80,7 +80,6 @@ public void open(int taskNumber, int numTasks) throws IOException {
fieldTypes,
options);
executor.prepareStatement(connectionProvider);
executor.setRuntimeContext(getRuntimeContext());

long flushIntervalMillis = options.getFlushInterval().toMillis();
scheduledFlush(flushIntervalMillis, "clickhouse-batch-output-format");
Expand All @@ -89,6 +88,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
}
}

/** @Deprecated: moved to open() */
@Override
public void open(InitializationContext initializationContext) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public synchronized void writeRecord(RowData record) throws IOException {
checkFlushException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

Expand All @@ -33,28 +33,17 @@

/** A rich sink function to write {@link RowData} records into ClickHouse. */
@Internal
public class ClickHouseRowDataSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction {
public class ClickHouseRowDataSink implements Sink<RowData>, CheckpointedFunction {

private final AbstractClickHouseOutputFormat outputFormat;

public ClickHouseRowDataSinkFunction(@Nonnull AbstractClickHouseOutputFormat outputFormat) {
public ClickHouseRowDataSink(@Nonnull AbstractClickHouseOutputFormat outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}

@Override
public void open(Configuration parameters) throws Exception {
outputFormat.configure(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
outputFormat.setRuntimeContext(runtimeContext);
outputFormat.open(
runtimeContext.getIndexOfThisSubtask(),
runtimeContext.getNumberOfParallelSubtasks());
}

@Override
public void invoke(RowData value, Context context) throws IOException {
outputFormat.writeRecord(value);
public SinkWriter<RowData> createWriter(WriterInitContext initContext) throws IOException {
return new ClickHouseRowDataSinkWriter(initContext, outputFormat);
}

@Override
Expand All @@ -64,9 +53,30 @@ public void initializeState(FunctionInitializationContext context) {}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
}

class ClickHouseRowDataSinkWriter implements SinkWriter<RowData> {
private final AbstractClickHouseOutputFormat outputFormat;

public ClickHouseRowDataSinkWriter(
WriterInitContext context, AbstractClickHouseOutputFormat outputFormat)
throws IOException {
this.outputFormat = outputFormat;
this.outputFormat.open();
}

@Override
public void close() {
public void write(RowData value, Context context) throws IOException, InterruptedException {
outputFormat.writeRecord(value);
}

@Override
public void close() throws Exception {
outputFormat.close();
}

@Override
public void flush(boolean b) throws IOException, InterruptedException {
outputFormat.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected ClickHouseShardOutputFormat(
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
public void open() throws IOException {
try {
Map<Integer, ClickHouseConnection> connectionMap =
connectionProvider.createShardConnections(clusterSpec);
Expand All @@ -114,6 +114,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
}
}

/** @Deprecated: moved to open() */
@Override
public void open(InitializationContext initializationContext) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public synchronized void writeRecord(RowData record) throws IOException {
checkFlushException();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ limitations under the License.
<junit.version>4.13.2</junit.version>
<clickhouse-jdbc.version>0.6.4</clickhouse-jdbc.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.19.0</flink.version>
<flink.version>2.1.0</flink.version>
<commons-lang3.version>3.13.0</commons-lang3.version>
<testcontainer.version>1.19.8</testcontainer.version>
<httpclient5.version>5.2.1</httpclient5.version>
Expand Down
Loading