Skip to content

Commit 508e0fe

Browse files
committed
Allow null values to be set to null for jsonformat
Description: Using the JsonFormat to write "from" debezium to kafka and then using the s3sinkconnector to read from kafka and save to s3, causes null values to be stored always with their default values. Therefore adding a new config property (for backwards compatibility) to allow the value transformer inside the s3sinkconnector to be configured correctly. Tests for the configuration and and integration have been added as well. This addresses #716, but for json, instead of avro
1 parent b1efca8 commit 508e0fe

File tree

6 files changed

+177
-108
lines changed

6 files changed

+177
-108
lines changed

kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.amazonaws.regions.Regions;
2525
import com.amazonaws.services.s3.model.CannedAccessControlList;
2626
import com.amazonaws.services.s3.model.SSEAlgorithm;
27-
import io.confluent.connect.storage.common.util.StringUtils;
2827
import org.apache.kafka.common.Configurable;
2928
import org.apache.kafka.common.config.AbstractConfig;
3029
import org.apache.kafka.common.config.ConfigDef;
@@ -67,6 +66,7 @@
6766
import io.confluent.connect.storage.common.GenericRecommender;
6867
import io.confluent.connect.storage.common.ParentValueRecommender;
6968
import io.confluent.connect.storage.common.StorageCommonConfig;
69+
import io.confluent.connect.storage.common.util.StringUtils;
7070
import io.confluent.connect.storage.format.Format;
7171
import io.confluent.connect.storage.partitioner.DailyPartitioner;
7272
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
@@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
184184
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
185185
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";
186186

187+
public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default";
188+
public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true;
189+
private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that"
190+
+ " have a default value and that are null to the default value."
191+
+ " When set to true, the default value is used, otherwise null is used.";
192+
private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default";
193+
187194
public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
188195
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
189196
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
@@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() {
300307
configDef.define(
301308
S3_BUCKET_CONFIG,
302309
Type.STRING,
303-
Importance.HIGH,
310+
ConfigDef.Importance.HIGH,
304311
"The S3 Bucket.",
305312
group,
306313
++orderInGroup,
307-
Width.LONG,
314+
ConfigDef.Width.LONG,
308315
"S3 Bucket"
309316
);
310317

@@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() {
537544
DECIMAL_FORMAT_DISPLAY
538545
);
539546

547+
configDef.define(
548+
REPLACE_NULL_WITH_DEFAULT_CONFIG,
549+
Type.BOOLEAN,
550+
REPLACE_NULL_WITH_DEFAULT_DEFAULT,
551+
Importance.LOW,
552+
REPLACE_NULL_WITH_DEFAULT_DOC,
553+
group,
554+
++orderInGroup,
555+
Width.SHORT,
556+
REPLACE_NULL_WITH_DEFAULT_DISPLAY
557+
);
558+
540559
configDef.define(
541560
S3_PART_RETRIES_CONFIG,
542561
Type.INT,

kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package io.confluent.connect.s3.format.json;
1717

1818
import org.apache.kafka.connect.json.JsonConverter;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
1921

2022
import java.util.HashMap;
2123
import java.util.Map;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2424

2525
import io.confluent.connect.s3.S3SinkConnectorConfig;
2626
import io.confluent.connect.s3.storage.S3Storage;
@@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) {
4646
"decimal.format",
4747
String.valueOf(storage.conf().getJsonDecimalFormat())
4848
);
49+
converterConfig.put(
50+
"replace.null.with.default",
51+
storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG)
52+
);
53+
4954
this.converter.configure(converterConfig, false);
5055
}
5156

kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,27 @@
1717

1818
import com.amazonaws.ClientConfiguration;
1919
import com.amazonaws.auth.AWSCredentialsProvider;
20-
20+
import io.confluent.connect.avro.AvroDataConfig;
21+
import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider;
22+
import io.confluent.connect.s3.format.avro.AvroFormat;
2123
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
24+
import io.confluent.connect.s3.format.json.JsonFormat;
2225
import io.confluent.connect.s3.format.parquet.ParquetFormat;
23-
26+
import io.confluent.connect.s3.storage.S3Storage;
27+
import io.confluent.connect.storage.common.StorageCommonConfig;
28+
import io.confluent.connect.storage.partitioner.DailyPartitioner;
29+
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
30+
import io.confluent.connect.storage.partitioner.FieldPartitioner;
31+
import io.confluent.connect.storage.partitioner.HourlyPartitioner;
32+
import io.confluent.connect.storage.partitioner.Partitioner;
33+
import io.confluent.connect.storage.partitioner.PartitionerConfig;
34+
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
2435
import org.apache.kafka.common.config.ConfigException;
2536
import org.apache.kafka.common.config.ConfigValue;
2637
import org.apache.kafka.connect.json.DecimalFormat;
2738
import org.apache.kafka.connect.sink.SinkRecord;
28-
import org.junit.After;
2939
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
40+
import org.junit.After;
3041
import org.junit.Before;
3142
import org.junit.Test;
3243

@@ -38,32 +49,15 @@
3849
import java.util.stream.Collectors;
3950
import java.util.stream.IntStream;
4051

41-
import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider;
42-
import io.confluent.connect.s3.format.avro.AvroFormat;
43-
import io.confluent.connect.s3.format.json.JsonFormat;
44-
import io.confluent.connect.s3.storage.S3Storage;
45-
import io.confluent.connect.storage.common.StorageCommonConfig;
46-
import io.confluent.connect.storage.partitioner.DailyPartitioner;
47-
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
48-
import io.confluent.connect.storage.partitioner.FieldPartitioner;
49-
import io.confluent.connect.storage.partitioner.HourlyPartitioner;
50-
import io.confluent.connect.storage.partitioner.Partitioner;
51-
import io.confluent.connect.storage.partitioner.PartitionerConfig;
52-
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
53-
import io.confluent.connect.avro.AvroDataConfig;
54-
5552
import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType;
56-
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
57-
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
5853
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
5954
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
6055
import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG;
61-
6256
import static org.junit.Assert.assertEquals;
57+
import static org.junit.Assert.assertFalse;
6358
import static org.junit.Assert.assertNull;
6459
import static org.junit.Assert.assertThrows;
6560
import static org.junit.Assert.assertTrue;
66-
import static org.junit.Assert.assertFalse;
6761

6862
public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase {
6963

@@ -458,6 +452,16 @@ public void testJsonDecimalFormat() {
458452
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
459453
}
460454

455+
@Test
456+
public void testJsonReplaceNullWithDefault() {
457+
connectorConfig = new S3SinkConnectorConfig(properties);
458+
assertTrue(connectorConfig.getBoolean("json.replace.null.with.default"));
459+
460+
properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false");
461+
connectorConfig = new S3SinkConnectorConfig(properties);
462+
assertFalse(connectorConfig.getBoolean("json.replace.null.with.default"));
463+
}
464+
461465
@Test
462466
public void testValidCompressionLevels() {
463467
IntStream.range(-1, 9).boxed().forEach(i -> {

kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515

1616
package io.confluent.connect.s3.integration;
1717

18-
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
19-
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
20-
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
21-
2218
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2319
import com.amazonaws.auth.BasicAWSCredentials;
2420
import com.amazonaws.services.s3.AmazonS3;
@@ -29,27 +25,12 @@
2925
import com.amazonaws.services.s3.model.S3ObjectSummary;
3026
import com.fasterxml.jackson.databind.JsonNode;
3127
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import com.fasterxml.jackson.databind.node.NullNode;
3229
import com.google.common.collect.ImmutableMap;
3330
import io.confluent.common.utils.IntegrationTest;
31+
import io.confluent.connect.s3.util.S3Utils;
3432
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
3533
import io.confluent.kafka.schemaregistry.RestApp;
36-
import java.io.BufferedReader;
37-
import java.io.File;
38-
import java.io.FileReader;
39-
import java.io.IOException;
40-
import java.net.ServerSocket;
41-
import java.util.ArrayList;
42-
import java.util.Arrays;
43-
import java.util.Date;
44-
import java.util.HashMap;
45-
import java.util.List;
46-
import java.util.Map;
47-
import java.util.Properties;
48-
import java.util.concurrent.TimeUnit;
49-
50-
import io.confluent.connect.s3.util.S3Utils;
51-
import java.util.function.Function;
52-
import java.util.stream.Collectors;
5334
import org.apache.avro.file.DataFileReader;
5435
import org.apache.avro.generic.GenericDatumReader;
5536
import org.apache.avro.generic.GenericRecord;
@@ -79,6 +60,25 @@
7960
import org.slf4j.Logger;
8061
import org.slf4j.LoggerFactory;
8162

63+
import java.io.BufferedReader;
64+
import java.io.File;
65+
import java.io.FileReader;
66+
import java.io.IOException;
67+
import java.net.ServerSocket;
68+
import java.util.ArrayList;
69+
import java.util.Arrays;
70+
import java.util.HashMap;
71+
import java.util.List;
72+
import java.util.Map;
73+
import java.util.Objects;
74+
import java.util.Properties;
75+
import java.util.concurrent.TimeUnit;
76+
import java.util.function.Function;
77+
import java.util.stream.Collectors;
78+
79+
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
80+
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
81+
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
8282
import static org.assertj.core.api.Assertions.assertThat;
8383

8484
@Category(IntegrationTest.class)
@@ -332,15 +332,15 @@ protected Schema getSampleStructSchema() {
332332
.field("myFloat32", Schema.FLOAT32_SCHEMA)
333333
.field("myFloat64", Schema.FLOAT64_SCHEMA)
334334
.field("myString", Schema.STRING_SCHEMA)
335+
.field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build())
335336
.build();
336337
}
337338

338339
protected Struct getSampleStructVal(Schema structSchema) {
339-
Date sampleDate = new Date(1111111);
340-
sampleDate.setTime(0);
341340
return new Struct(structSchema)
342341
.put("ID", (long) 1)
343342
.put("myBool", true)
343+
.put("withDefault", null)
344344
.put("myInt32", 32)
345345
.put("myFloat32", 3.2f)
346346
.put("myFloat64", 64.64)
@@ -409,12 +409,15 @@ protected static void clearBucket(String bucketName) {
409409
* @param bucketName the name of the s3 test bucket
410410
* @param expectedRowsPerFile the number of rows a file should have
411411
* @param expectedRow the expected row data in each file
412+
* @param useDefaultValues
413+
*
412414
* @return whether every row of the files read equals the expected row
413415
*/
414416
protected boolean fileContentsAsExpected(
415417
String bucketName,
416418
int expectedRowsPerFile,
417-
Struct expectedRow
419+
Struct expectedRow,
420+
boolean useDefaultValues
418421
) {
419422
log.info("expectedRow: {}", expectedRow);
420423
for (String fileName :
@@ -427,7 +430,7 @@ protected boolean fileContentsAsExpected(
427430
String fileExtension = getExtensionFromKey(fileName);
428431
List<JsonNode> downloadedFileContents = contentGetters.get(fileExtension)
429432
.apply(destinationPath);
430-
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) {
433+
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) {
431434
return false;
432435
}
433436
downloadedFile.delete();
@@ -481,20 +484,23 @@ protected boolean keyfileContentsAsExpected(
481484
* @param fileContents the file contents as a list of JsonNodes
482485
* @param expectedRowsPerFile the number of rows expected in the file
483486
* @param expectedRow the expected values of each row
487+
* @param useDefaultValues use default values from struct
488+
*
484489
* @return whether the file contents match the expected row
485490
*/
486491
protected boolean fileContentsMatchExpected(
487492
List<JsonNode> fileContents,
488493
int expectedRowsPerFile,
489-
Struct expectedRow
494+
Struct expectedRow,
495+
boolean useDefaultValues
490496
) {
491497
if (fileContents.size() != expectedRowsPerFile) {
492498
log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}",
493499
fileContents.size(), expectedRowsPerFile);
494500
return false;
495501
}
496502
for (JsonNode row : fileContents) {
497-
if (!fileRowMatchesExpectedRow(row, expectedRow)) {
503+
if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) {
498504
return false;
499505
}
500506
}
@@ -512,18 +518,34 @@ private List<String> getS3KeyFileList(List<S3ObjectSummary> summaries) {
512518
/**
513519
* Compare the row in the file and its values to the expected row's values.
514520
*
515-
* @param fileRow the row read from the file as a JsonNode
516-
* @param expectedRow the expected contents of the row
521+
* @param fileRow the row read from the file as a JsonNode
522+
* @param expectedRow the expected contents of the row
523+
* @param useDefaultValues
524+
*
517525
* @return whether the file row matches the expected row
518526
*/
519-
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) {
520-
log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
527+
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) {
528+
log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
521529
// compare the field values
522530
for (Field key : expectedRow.schema().fields()) {
523-
String expectedValue = expectedRow.get(key).toString();
524-
String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", "");
525-
log.debug("Comparing values: {}, {}", expectedValue, rowValue);
526-
if (!rowValue.equals(expectedValue)) {
531+
String expectedValue = null;
532+
if (useDefaultValues) {
533+
expectedValue = expectedRow.get(key).toString();
534+
} else {
535+
Object withoutDefault = expectedRow.getWithoutDefault(key.name());
536+
if (withoutDefault != null) {
537+
expectedValue = withoutDefault.toString();
538+
}
539+
}
540+
541+
JsonNode jsonValue = fileRow.get(key.name());
542+
String rowValue = null;
543+
if (!(jsonValue instanceof NullNode)) {
544+
rowValue = jsonValue.toString().replaceAll("^\"|\"$", "");
545+
}
546+
547+
log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue));
548+
if (!Objects.equals(rowValue, expectedValue)) {
527549
return false;
528550
}
529551
}

0 commit comments

Comments
 (0)