Skip to content

Commit 26c7356

Browse files
committed
Allow null values to be set to null for jsonformat
Description: Using the JsonFormat to write "from" debezium to kafka and then using the s3sinkconnector to read from kafka and save to s3, causes null values to be stored always with their default values. Therefore adding a new config property (for backwards compatibility) to allow the value transformer inside the s3sinkconnector to be configured correctly. Tests for the configuration and and integration have been added as well. This addresses #716, but for json, instead of avro
1 parent b1efca8 commit 26c7356

File tree

6 files changed

+171
-100
lines changed

6 files changed

+171
-100
lines changed

kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.amazonaws.regions.Regions;
2525
import com.amazonaws.services.s3.model.CannedAccessControlList;
2626
import com.amazonaws.services.s3.model.SSEAlgorithm;
27-
import io.confluent.connect.storage.common.util.StringUtils;
2827
import org.apache.kafka.common.Configurable;
2928
import org.apache.kafka.common.config.AbstractConfig;
3029
import org.apache.kafka.common.config.ConfigDef;
@@ -67,6 +66,7 @@
6766
import io.confluent.connect.storage.common.GenericRecommender;
6867
import io.confluent.connect.storage.common.ParentValueRecommender;
6968
import io.confluent.connect.storage.common.StorageCommonConfig;
69+
import io.confluent.connect.storage.common.util.StringUtils;
7070
import io.confluent.connect.storage.format.Format;
7171
import io.confluent.connect.storage.partitioner.DailyPartitioner;
7272
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
@@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
184184
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
185185
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";
186186

187+
public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default";
188+
public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true;
189+
private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that"
190+
+ " have a default value and that are null to the default value."
191+
+ " When set to true, the default value is used, otherwise null is used.";
192+
private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default";
193+
187194
public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
188195
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
189196
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
@@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() {
300307
configDef.define(
301308
S3_BUCKET_CONFIG,
302309
Type.STRING,
303-
Importance.HIGH,
310+
ConfigDef.Importance.HIGH,
304311
"The S3 Bucket.",
305312
group,
306313
++orderInGroup,
307-
Width.LONG,
314+
ConfigDef.Width.LONG,
308315
"S3 Bucket"
309316
);
310317

@@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() {
537544
DECIMAL_FORMAT_DISPLAY
538545
);
539546

547+
configDef.define(
548+
REPLACE_NULL_WITH_DEFAULT_CONFIG,
549+
Type.BOOLEAN,
550+
REPLACE_NULL_WITH_DEFAULT_DEFAULT,
551+
Importance.LOW,
552+
REPLACE_NULL_WITH_DEFAULT_DOC,
553+
group,
554+
++orderInGroup,
555+
Width.SHORT,
556+
REPLACE_NULL_WITH_DEFAULT_DISPLAY
557+
);
558+
540559
configDef.define(
541560
S3_PART_RETRIES_CONFIG,
542561
Type.INT,

kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package io.confluent.connect.s3.format.json;
1717

1818
import org.apache.kafka.connect.json.JsonConverter;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
1921

2022
import java.util.HashMap;
2123
import java.util.Map;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2424

2525
import io.confluent.connect.s3.S3SinkConnectorConfig;
2626
import io.confluent.connect.s3.storage.S3Storage;
@@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) {
4646
"decimal.format",
4747
String.valueOf(storage.conf().getJsonDecimalFormat())
4848
);
49+
converterConfig.put(
50+
"replace.null.with.default",
51+
storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG)
52+
);
53+
4954
this.converter.configure(converterConfig, false);
5055
}
5156

kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
import com.amazonaws.ClientConfiguration;
1919
import com.amazonaws.auth.AWSCredentialsProvider;
20-
21-
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
22-
import io.confluent.connect.s3.format.parquet.ParquetFormat;
23-
2420
import org.apache.kafka.common.config.ConfigException;
2521
import org.apache.kafka.common.config.ConfigValue;
2622
import org.apache.kafka.connect.json.DecimalFormat;
2723
import org.apache.kafka.connect.sink.SinkRecord;
28-
import org.junit.After;
2924
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
25+
import org.junit.After;
3026
import org.junit.Before;
3127
import org.junit.Test;
3228

@@ -38,9 +34,12 @@
3834
import java.util.stream.Collectors;
3935
import java.util.stream.IntStream;
4036

37+
import io.confluent.connect.avro.AvroDataConfig;
4138
import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider;
4239
import io.confluent.connect.s3.format.avro.AvroFormat;
40+
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
4341
import io.confluent.connect.s3.format.json.JsonFormat;
42+
import io.confluent.connect.s3.format.parquet.ParquetFormat;
4443
import io.confluent.connect.s3.storage.S3Storage;
4544
import io.confluent.connect.storage.common.StorageCommonConfig;
4645
import io.confluent.connect.storage.partitioner.DailyPartitioner;
@@ -50,20 +49,16 @@
5049
import io.confluent.connect.storage.partitioner.Partitioner;
5150
import io.confluent.connect.storage.partitioner.PartitionerConfig;
5251
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
53-
import io.confluent.connect.avro.AvroDataConfig;
5452

5553
import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType;
56-
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
57-
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
5854
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
5955
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
6056
import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG;
61-
6257
import static org.junit.Assert.assertEquals;
58+
import static org.junit.Assert.assertFalse;
6359
import static org.junit.Assert.assertNull;
6460
import static org.junit.Assert.assertThrows;
6561
import static org.junit.Assert.assertTrue;
66-
import static org.junit.Assert.assertFalse;
6762

6863
public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase {
6964

@@ -458,6 +453,16 @@ public void testJsonDecimalFormat() {
458453
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
459454
}
460455

456+
@Test
457+
public void testJsonReplaceNullWithDefault() {
458+
connectorConfig = new S3SinkConnectorConfig(properties);
459+
assertTrue(connectorConfig.getBoolean("json.replace.null.with.default"));
460+
461+
properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false");
462+
connectorConfig = new S3SinkConnectorConfig(properties);
463+
assertFalse(connectorConfig.getBoolean("json.replace.null.with.default"));
464+
}
465+
461466
@Test
462467
public void testValidCompressionLevels() {
463468
IntStream.range(-1, 9).boxed().forEach(i -> {

kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515

1616
package io.confluent.connect.s3.integration;
1717

18-
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
19-
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
20-
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
21-
2218
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2319
import com.amazonaws.auth.BasicAWSCredentials;
2420
import com.amazonaws.services.s3.AmazonS3;
@@ -29,27 +25,8 @@
2925
import com.amazonaws.services.s3.model.S3ObjectSummary;
3026
import com.fasterxml.jackson.databind.JsonNode;
3127
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import com.fasterxml.jackson.databind.node.NullNode;
3229
import com.google.common.collect.ImmutableMap;
33-
import io.confluent.common.utils.IntegrationTest;
34-
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
35-
import io.confluent.kafka.schemaregistry.RestApp;
36-
import java.io.BufferedReader;
37-
import java.io.File;
38-
import java.io.FileReader;
39-
import java.io.IOException;
40-
import java.net.ServerSocket;
41-
import java.util.ArrayList;
42-
import java.util.Arrays;
43-
import java.util.Date;
44-
import java.util.HashMap;
45-
import java.util.List;
46-
import java.util.Map;
47-
import java.util.Properties;
48-
import java.util.concurrent.TimeUnit;
49-
50-
import io.confluent.connect.s3.util.S3Utils;
51-
import java.util.function.Function;
52-
import java.util.stream.Collectors;
5330
import org.apache.avro.file.DataFileReader;
5431
import org.apache.avro.generic.GenericDatumReader;
5532
import org.apache.avro.generic.GenericRecord;
@@ -79,6 +56,30 @@
7956
import org.slf4j.Logger;
8057
import org.slf4j.LoggerFactory;
8158

59+
import java.io.BufferedReader;
60+
import java.io.File;
61+
import java.io.FileReader;
62+
import java.io.IOException;
63+
import java.net.ServerSocket;
64+
import java.util.ArrayList;
65+
import java.util.Arrays;
66+
import java.util.HashMap;
67+
import java.util.List;
68+
import java.util.Map;
69+
import java.util.Objects;
70+
import java.util.Properties;
71+
import java.util.concurrent.TimeUnit;
72+
import java.util.function.Function;
73+
import java.util.stream.Collectors;
74+
75+
import io.confluent.common.utils.IntegrationTest;
76+
import io.confluent.connect.s3.util.S3Utils;
77+
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
78+
import io.confluent.kafka.schemaregistry.RestApp;
79+
80+
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
81+
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
82+
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
8283
import static org.assertj.core.api.Assertions.assertThat;
8384

8485
@Category(IntegrationTest.class)
@@ -332,15 +333,15 @@ protected Schema getSampleStructSchema() {
332333
.field("myFloat32", Schema.FLOAT32_SCHEMA)
333334
.field("myFloat64", Schema.FLOAT64_SCHEMA)
334335
.field("myString", Schema.STRING_SCHEMA)
336+
.field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build())
335337
.build();
336338
}
337339

338340
protected Struct getSampleStructVal(Schema structSchema) {
339-
Date sampleDate = new Date(1111111);
340-
sampleDate.setTime(0);
341341
return new Struct(structSchema)
342342
.put("ID", (long) 1)
343343
.put("myBool", true)
344+
.put("withDefault", null)
344345
.put("myInt32", 32)
345346
.put("myFloat32", 3.2f)
346347
.put("myFloat64", 64.64)
@@ -409,12 +410,15 @@ protected static void clearBucket(String bucketName) {
409410
* @param bucketName the name of the s3 test bucket
410411
* @param expectedRowsPerFile the number of rows a file should have
411412
* @param expectedRow the expected row data in each file
413+
* @param useDefaultValues
414+
*
412415
* @return whether every row of the files read equals the expected row
413416
*/
414417
protected boolean fileContentsAsExpected(
415418
String bucketName,
416419
int expectedRowsPerFile,
417-
Struct expectedRow
420+
Struct expectedRow,
421+
boolean useDefaultValues
418422
) {
419423
log.info("expectedRow: {}", expectedRow);
420424
for (String fileName :
@@ -427,7 +431,7 @@ protected boolean fileContentsAsExpected(
427431
String fileExtension = getExtensionFromKey(fileName);
428432
List<JsonNode> downloadedFileContents = contentGetters.get(fileExtension)
429433
.apply(destinationPath);
430-
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) {
434+
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) {
431435
return false;
432436
}
433437
downloadedFile.delete();
@@ -481,20 +485,23 @@ protected boolean keyfileContentsAsExpected(
481485
* @param fileContents the file contents as a list of JsonNodes
482486
* @param expectedRowsPerFile the number of rows expected in the file
483487
* @param expectedRow the expected values of each row
488+
* @param useDefaultValues use default values from struct
489+
*
484490
* @return whether the file contents match the expected row
485491
*/
486492
protected boolean fileContentsMatchExpected(
487493
List<JsonNode> fileContents,
488494
int expectedRowsPerFile,
489-
Struct expectedRow
495+
Struct expectedRow,
496+
boolean useDefaultValues
490497
) {
491498
if (fileContents.size() != expectedRowsPerFile) {
492499
log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}",
493500
fileContents.size(), expectedRowsPerFile);
494501
return false;
495502
}
496503
for (JsonNode row : fileContents) {
497-
if (!fileRowMatchesExpectedRow(row, expectedRow)) {
504+
if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) {
498505
return false;
499506
}
500507
}
@@ -512,18 +519,34 @@ private List<String> getS3KeyFileList(List<S3ObjectSummary> summaries) {
512519
/**
513520
* Compare the row in the file and its values to the expected row's values.
514521
*
515-
* @param fileRow the row read from the file as a JsonNode
516-
* @param expectedRow the expected contents of the row
522+
* @param fileRow the row read from the file as a JsonNode
523+
* @param expectedRow the expected contents of the row
524+
* @param useDefaultValues
525+
*
517526
* @return whether the file row matches the expected row
518527
*/
519-
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) {
520-
log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
528+
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) {
529+
log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
521530
// compare the field values
522531
for (Field key : expectedRow.schema().fields()) {
523-
String expectedValue = expectedRow.get(key).toString();
524-
String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", "");
525-
log.debug("Comparing values: {}, {}", expectedValue, rowValue);
526-
if (!rowValue.equals(expectedValue)) {
532+
String expectedValue = null;
533+
if (useDefaultValues) {
534+
expectedValue = expectedRow.get(key).toString();
535+
} else {
536+
Object withoutDefault = expectedRow.getWithoutDefault(key.name());
537+
if (withoutDefault != null) {
538+
expectedValue = withoutDefault.toString();
539+
}
540+
}
541+
542+
JsonNode jsonValue = fileRow.get(key.name());
543+
String rowValue = null;
544+
if (!(jsonValue instanceof NullNode)) {
545+
rowValue = jsonValue.toString().replaceAll("^\"|\"$", "");
546+
}
547+
548+
log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue));
549+
if (!Objects.equals(rowValue, expectedValue)) {
527550
return false;
528551
}
529552
}

0 commit comments

Comments
 (0)