Skip to content

Commit 4a29ebe

Browse files
authored
Merge pull request #194 from marklogic/develop
Merge develop into master for the 1.10.0 release
2 parents d29a1a0 + e1a3f3b commit 4a29ebe

12 files changed

+274
-171
lines changed

CONTRIBUTING.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ distribution.
44

55
### Requirements:
66
* MarkLogic Server 11+
7-
* Java (either version 8, 11, or 17). It is recommended to use 11 or 17, as Confluent has deprecated Java 8 support in
8-
Confluent 7.x and is removing it in Confluent 8.x. Additionally, Sonar requires the use of Java 11 or 17.
7+
* Java, either version 11 or 17, is required to use the Gradle tools.
8+
Additionally, SonarQube requires the use of Java 17.
99

1010
See [the Confluent compatibility matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html#java)
1111
for more information. After installing your desired version of Java, ensure that the `JAVA_HOME` environment variable
@@ -76,7 +76,7 @@ application must be deployed. From the "test-app" directory, follow these steps:
7676

7777
## Automated Testing
7878
Now that your MarkLogic server is configured and the test-app is deployed, you can run the tests via from the root
79-
directory:
79+
directory. Note that you must be using Java 11 or Java 17 for this command due to the latest version of Gradle.
8080
```
8181
./gradlew test
8282
```
@@ -318,3 +318,7 @@ project. You must have Ruby installed. Additionally, there seems to be a bug wit
318318
The server needs to be run with Ruby 3.2.3, so you will need to run `chruby ruby-3.2.3` before starting the jekyll
319319
server. To start the jekyll server, cd into the /docs directory and run the command `bundle exec jekyll server`.
320320
This will start the server and the user documents will be available at http://127.0.0.1:4000/.
321+
322+
# Publishing the Connector to Confluent
323+
324+
Please refer to the internal Wiki page for information regarding the process for releasing the connector to the Confluent Hub.

build.gradle

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
plugins {
22
id 'java'
33
id 'net.saliman.properties' version '1.5.2'
4-
id 'com.github.johnrengelman.shadow' version '8.1.1'
5-
id "com.github.jk1.dependency-license-report" version "1.19"
4+
id 'com.gradleup.shadow' version '8.3.4'
65

76
// Only used for testing
8-
id 'com.marklogic.ml-gradle' version '4.8.0'
7+
id 'com.marklogic.ml-gradle' version '5.0.0'
98
id 'jacoco'
10-
id "org.sonarqube" version "4.4.1.3373"
9+
id "org.sonarqube" version "5.1.0.4882"
1110

1211
// Used to generate Avro classes. This will write classes to build/generated-test-avro-java and also add that folder
1312
// as a source root. Since this is commented out by default, the generated Avro test class has been added to
@@ -33,24 +32,21 @@ configurations {
3332
ext {
3433
// Even though Kafka Connect 3.7.0 is out, we're staying with 3.6.1 in order to continue
3534
// using the third-party Kafka JUnit tool. See https://github.com/mguenther/kafka-junit?tab=readme-ov-file
36-
kafkaVersion = "3.6.1"
35+
kafkaVersion = "3.8.1"
3736
}
3837

3938
dependencies {
40-
compileOnly "org.apache.kafka:connect-api:${kafkaVersion}"
41-
compileOnly "org.apache.kafka:connect-json:${kafkaVersion}"
4239
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
43-
compileOnly "org.slf4j:slf4j-api:2.0.13"
40+
compileOnly "org.slf4j:slf4j-api:1.7.36"
4441

45-
implementation 'com.marklogic:ml-javaclient-util:4.8.0'
4642
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
47-
implementation "com.marklogic:ml-app-deployer:4.8.0"
43+
implementation "com.marklogic:ml-app-deployer:5.0.0"
4844

49-
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.15.3"
45+
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.17.2"
5046

5147
// Note that in general, the version of the DHF jar must match that of the deployed DHF instance. Different versions
5248
// may work together, but that behavior is not guaranteed.
53-
implementation("com.marklogic:marklogic-data-hub:6.0.0") {
49+
implementation("com.marklogic:marklogic-data-hub:6.1.1") {
5450
exclude module: "marklogic-client-api"
5551
exclude module: "ml-javaclient-util"
5652
exclude module: "ml-app-deployer"
@@ -61,17 +57,18 @@ dependencies {
6157
exclude module: "logback-classic"
6258
}
6359

64-
testImplementation 'com.marklogic:marklogic-junit5:1.4.0'
60+
testImplementation 'com.marklogic:marklogic-junit5:1.5.0'
6561

66-
testImplementation "org.apache.kafka:connect-api:${kafkaVersion}"
6762
testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"
63+
64+
// Can be deleted when the disabled kafka-junit tests are deleted.
6865
testImplementation 'net.mguenther.kafka:kafka-junit:3.6.0'
6966

70-
testImplementation "org.apache.avro:avro-compiler:1.11.3"
67+
testImplementation "org.apache.avro:avro-compiler:1.12.0"
7168

7269
// Forcing logback to be used for test logging
7370
testImplementation "ch.qos.logback:logback-classic:1.3.14"
74-
testImplementation "org.slf4j:jcl-over-slf4j:2.0.13"
71+
testImplementation "org.slf4j:jcl-over-slf4j:2.0.16"
7572

7673
documentation files('LICENSE.txt')
7774
documentation files('NOTICE.txt')

docs/writing-data.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,13 +298,16 @@ sent to the DLQ.
298298
MarkLogic then each of the records in the batch will be sent to the DLQ. The entire batch must be sent to the DLQ since
299299
the connector is unable to determine the cause of the failure.
300300

301-
When a record is sent to the DLQ, the connector first adds headers to the record providing information about the cause
301+
When a record is sent to the DLQ, the connector first adds headers to the record, providing information about the cause
302302
of the failure in order to assist with troubleshooting and potential routing.
303303
- "marklogic-failure-type" : Either "Write failure" or "Record conversion"
304304
- "marklogic-exception-message" : Information from MarkLogic when there is a write failure
305305
- "marklogic-original-topic" : The name of the topic that this record came from
306306
- "marklogic-target-uri" : For write failures, this contains the target URI for the document
307307

308+
For those headers to be populated properly, the version of this connector must be compatible with the version of Kafka
309+
that is being used. The 1.8.0 and 1.9.0 versions of the connector work with Kafka versions before 3.8. Starting with
310+
the connector version 1.10.0, the Kafka version must be 3.8 or later.
308311

309312
## Sink connector error handling
310313

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.marklogic
2-
version=1.9.0
2+
version=1.10.0
33

44
# For the Confluent Connector Archive
55
componentOwner=marklogic

src/main/java/com/marklogic/kafka/connect/sink/WriteBatcherSinkTask.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.kafka.connect.runtime.InternalSinkRecord;
3636
import org.apache.kafka.connect.sink.ErrantRecordReporter;
3737
import org.apache.kafka.connect.sink.SinkRecord;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840
import org.springframework.util.StringUtils;
3941

4042
import java.io.IOException;
@@ -53,6 +55,8 @@
5355
*/
5456
public class WriteBatcherSinkTask extends AbstractSinkTask {
5557

58+
protected static final Logger classLogger = LoggerFactory.getLogger(WriteBatcherSinkTask.class);
59+
5660
private DatabaseClient databaseClient;
5761
private DataMovementManager dataMovementManager;
5862
private WriteBatcher writeBatcher;
@@ -101,20 +105,35 @@ protected void writeSinkRecord(SinkRecord sinkRecord) {
101105

102106
static void addFailureHeaders(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
103107
if (sinkRecord instanceof InternalSinkRecord) {
104-
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).originalRecord();
105-
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
106-
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
107-
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(sinkRecord.topic()));
108-
if (writeEvent != null) {
109-
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
108+
try {
109+
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).context().original();
110+
addFailureHeadersToOriginalSinkRecord(originalRecord, e, failureHeaderValue, writeEvent);
111+
} catch (NoSuchMethodError methodException) {
112+
classLogger.warn("This version of the MarkLogic Kafka Connector requires Kafka version 3.8.0 or" +
113+
" higher in order to store failure information on the original sink record. Instead, the failure" +
114+
" information will be on the wrapper sink record.");
115+
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
110116
}
111117
} else {
112-
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
113-
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
114-
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
115-
if (writeEvent != null) {
116-
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
117-
}
118+
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
119+
}
120+
}
121+
122+
static void addFailureHeadersToNonInternalSinkRecord(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
123+
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
124+
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
125+
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
126+
if (writeEvent != null) {
127+
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
128+
}
129+
}
130+
131+
static void addFailureHeadersToOriginalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
132+
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
133+
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
134+
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(originalRecord.topic()));
135+
if (writeEvent != null) {
136+
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
118137
}
119138
}
120139

src/test/java/com/marklogic/kafka/connect/sink/SendWriteFailureRecordsToDlqKafkaTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import net.mguenther.kafka.junit.ReadKeyValues;
2323
import net.mguenther.kafka.junit.SendKeyValues;
2424
import org.apache.kafka.common.header.Headers;
25-
import org.junit.jupiter.api.AfterEach;
26-
import org.junit.jupiter.api.Assertions;
27-
import org.junit.jupiter.api.BeforeEach;
28-
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.*;
2926
import org.springframework.beans.factory.annotation.Autowired;
3027

3128
import java.util.ArrayList;
@@ -68,6 +65,7 @@ void tearDownKafka() {
6865
}
6966

7067
@Test
68+
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
7169
void failedBatchesShouldGoToTheDlq() throws InterruptedException {
7270
sendSomeJsonMessages(NUM_RECORDS);
7371

src/test/java/com/marklogic/kafka/connect/sink/WriteAvroDataTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.JsonNode;
44
import org.apache.avro.Schema;
5+
import org.apache.avro.CanonicalSchemaFormatterFactory;
56
import org.apache.avro.SchemaBuilder;
67
import org.apache.avro.io.DatumWriter;
78
import org.apache.avro.io.Encoder;
@@ -83,7 +84,7 @@ void writeSchema() throws IOException {
8384
.endRecord();
8485

8586
FileCopyUtils.copy(
86-
mySchema.toString(true).getBytes(),
87+
new CanonicalSchemaFormatterFactory().getDefaultFormatter().format(mySchema).getBytes(),
8788
new File(Paths.get("src", "test", "avro").toFile(), "avroTestClass-schema.avsc")
8889
);
8990
}

src/test/java/com/marklogic/kafka/connect/sink/WriteFromKafkaTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import net.mguenther.kafka.junit.SendKeyValues;
2525
import org.junit.jupiter.api.AfterEach;
2626
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Disabled;
2728
import org.junit.jupiter.api.Test;
2829

2930
import java.util.ArrayList;
@@ -56,6 +57,7 @@ void tearDownKafka() {
5657
}
5758

5859
@Test
60+
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
5961
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
6062
Integer NUM_RECORDS = 2;
6163
sendSomeJsonMessages(NUM_RECORDS);

src/test/java/com/marklogic/kafka/connect/sink/WriteTransformDocumentTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import net.mguenther.kafka.junit.SendKeyValues;
2525
import org.junit.jupiter.api.AfterEach;
2626
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Disabled;
2728
import org.junit.jupiter.api.Test;
2829

2930
import java.util.ArrayList;
@@ -56,6 +57,7 @@ void tearDownKafka() {
5657
}
5758

5859
@Test
60+
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
5961
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
6062
Integer NUM_RECORDS = 2;
6163
sendSomeJsonMessages(NUM_RECORDS);

src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslKafkaTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
2121
import org.junit.jupiter.api.AfterEach;
2222
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Disabled;
2324
import org.junit.jupiter.api.Test;
2425

2526
import java.util.Properties;
@@ -48,6 +49,7 @@ void tearDownKafka() {
4849

4950
@SuppressWarnings("java:S2699") // The assertion happens via kafka.observe
5051
@Test
52+
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
5153
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
5254
kafka.observe(on(AUTHORS_TOPIC, 15));
5355
}

0 commit comments

Comments
 (0)