Skip to content

Commit 16e135e

Browse files
authored
Feature/kst 8 null objects (#15)
* kst-8 - First pass at fixing the bug with handling messages with a null value. These can be "tombstone" messages. Add 2 tests. One test for a blank message (""), and one for a null value. * kst-8 - First pass at fixing the bug with handling messages with a null value. These can be "tombstone" messages. Add 2 tests. One test for a blank message (""), and one for a null value. * kst-8 - Implementing review recommendations. Throwing NullPointerExceptions in lower level classes. Logging null records and values in the MarkLogicSinkTask to catch those early in the process. * kst-8 - First pass at fixing the bug with handling messages with a null value. These can be "tombstone" messages. Add 2 tests. One test for a blank message (""), and one for a null value. * kst-8 - Implementing review recommendations. Throwing NullPointerExceptions in lower level classes. Logging null records and values in the MarkLogicSinkTask to catch those early in the process. * fixing a typo
1 parent 0dd1488 commit 16e135e

File tree

5 files changed

+40
-8
lines changed

5 files changed

+40
-8
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ When a document is received and written by the connector, you'll see logging lik
7878
| ml.dmsdk.batchSize | 100 | Sets the number of documents to be written in a batch to MarkLogic. This may not have any impact depending on the connector receives data from Kafka, as the connector calls flushAsync on the DMSDK WriteBatcher after processing every collection of records. Thus, if the connector never receives at one time more than the value of this property, then the value of this property will have no impact. |
7979
| ml.dmsdk.threadCount | 8 | Sets the number of threads used by the Data Movement SDK for parallelizing writes to MarkLogic. Similar to the batch size property above, this may never come into play depending on how many records the connector receives at once. |
8080
| ml.document.collections | kafka-data | Optional - a comma-separated list of collections that each document should be written to |
81-
|ml.document.addTopicToCollections | false | Sets this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector
81+
| ml.document.addTopicToCollections | false | Set this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector
8282
| ml.document.format | JSON | Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN |
8383
| ml.document.mimeType | (empty) | Optional - specify a mime type for each document; typically the format property above will be used instead of this |
8484
| ml.document.permissions | rest-reader,read,rest-writer,update | Optional - a comma-separated list of roles and capabilities that define the permissions for each document written to MarkLogic |

src/main/java/com/marklogic/client/ext/document/DocumentWriteOperationBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ public class DocumentWriteOperationBuilder {
1717
private ContentIdExtractor contentIdExtractor = new DefaultContentIdExtractor();
1818

1919
public DocumentWriteOperation build(AbstractWriteHandle content, DocumentMetadataHandle metadata ) {
20+
if (content == null) {
21+
throw new NullPointerException("'content' must not be null");
22+
}
23+
2024
if (hasText(collections)) {
2125
metadata.getCollections().addAll(collections.trim().split(","));
2226
}

src/main/java/com/marklogic/kafka/connect/sink/DefaultSinkRecordConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ protected DocumentMetadataHandle addTopicToCollections (String topic, Boolean ad
6969
* @return
7070
*/
7171
protected AbstractWriteHandle toContent(SinkRecord record) {
72+
if ((record == null) || (record.value() == null)) {
73+
throw new NullPointerException("'record' must not be null, and must have a value.");
74+
}
7275
Object value = record.value();
7376
if (value instanceof byte[]) {
7477
BytesHandle content = new BytesHandle((byte[]) value);

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,25 @@ public void put(final Collection<SinkRecord> records) {
111111
}
112112

113113
records.forEach(record -> {
114-
if (logger.isDebugEnabled()) {
115-
logger.debug("Processing record value {} in topic {}", record.value(), record.topic());
114+
if (record == null) {
115+
logger.warn("Skipping null record object.");
116+
} else {
117+
if (logger.isDebugEnabled()) {
118+
logger.debug("Processing record value {} in topic {}", record.value(), record.topic());
119+
}
120+
if (record.value() != null) {
121+
writeBatcher.add(sinkRecordConverter.convert(record));
122+
} else {
123+
logger.warn("Skipping record with null value - possibly a 'tombstone' message.");
124+
}
116125
}
117-
writeBatcher.add(sinkRecordConverter.convert(record));
118126
});
119127

120-
writeBatcher.flushAsync();
128+
if (writeBatcher != null) {
129+
writeBatcher.flushAsync();
130+
} else {
131+
logger.warn("writeBatcher is null - ignore this is you are running unit tests, otherwise this is a problem.");
132+
}
121133
}
122134

123135
public String version() {

src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88
import org.apache.kafka.connect.sink.SinkRecord;
99
import org.junit.jupiter.api.Test;
1010

11-
import java.util.HashMap;
12-
import java.util.Iterator;
13-
import java.util.Map;
11+
import java.util.*;
1412

1513
import static org.junit.jupiter.api.Assertions.assertEquals;
1614
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -21,6 +19,7 @@
2119
public class ConvertSinkRecordTest {
2220

2321
DefaultSinkRecordConverter converter;
22+
MarkLogicSinkTask markLogicSinkTask = new MarkLogicSinkTask();
2423

2524
@Test
2625
public void allPropertiesSet() {
@@ -78,6 +77,20 @@ public void binaryContent() {
7877
assertEquals("hello world".getBytes().length, content.get().length);
7978
}
8079

80+
@Test
81+
public void emptyContent() {
82+
final Collection<SinkRecord> records = new ArrayList<>();
83+
records.add(newSinkRecord(null));
84+
markLogicSinkTask.put(records);
85+
}
86+
87+
@Test
88+
public void nullContent() {
89+
final Collection<SinkRecord> records = new ArrayList<>();
90+
records.add(null);
91+
markLogicSinkTask.put(records);
92+
}
93+
8194
private SinkRecord newSinkRecord(Object value) {
8295
return new SinkRecord("test-topic", 1, null, null, null, value, 0);
8396
}

0 commit comments

Comments
 (0)