Skip to content

Commit 77a2c10

Browse files
authored
Merge pull request #154 from marklogic/feature/dhf-confluent
DHF integration in sink connector now works with Confluent
2 parents 3f7a5e7 + 1be637e commit 77a2c10

File tree

5 files changed

+21
-16
lines changed

5 files changed

+21
-16
lines changed

CONTRIBUTING.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,11 @@ does not already exist) to point to where you extracted the Confluent Platform d
8080

8181
Then build and copy the connector to the Confluent Platform directory that you configured above:
8282

83-
./gradlew copyConnectorToConfluent
83+
./gradlew clean copyConnectorToConfluent
8484

8585
Note that any time you modify the MarkLogic Kafka connector code, you'll need to repeat the
86-
`./gradlew copyConnectorToConfluent` step.
86+
`./gradlew clean copyConnectorToConfluent` step. Note that `clean` is included to ensure that in case you've changed
87+
any connector dependencies, old dependencies will not be included in the connector archive.
8788

8889
Next, start Confluent:
8990

README.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,6 @@ ingestion step depends on reading data from a filesystem. You can however run an
560560
transform as described above. Please see the DHF documentation for information on how to configure the DHF REST transform
561561
for running an ingestion step.
562562

563-
**Warning** - as of the MarkLogic Kafka 1.7.0 release, this feature may not work on certain versions of Confluent
564-
Platform due to dependency conflicts. Testing has verified that this feature works on the latest Apache Kafka 2.8.x
565-
release and the latest Apache Kafka 3.3.x release.
566-
567-
568563
### Writing data via custom code (Bulk Data Services)
569564

570565
MarkLogic's [Bulk Data Services](https://github.com/marklogic/java-client-api/wiki/Bulk-Data-Services) feature is

build.gradle

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,14 @@ dependencies {
6666
exclude module: "logback-classic"
6767
}
6868

69-
// 1.2.1 will be published soon, and then we'll switch to that
70-
testImplementation 'com.marklogic:marklogic-junit5:1.2.1'
69+
testImplementation 'com.marklogic:marklogic-junit5:1.3.0'
7170

7271
testImplementation "org.apache.kafka:connect-api:${kafkaVersion}"
7372
testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"
7473
testImplementation 'net.mguenther.kafka:kafka-junit:3.2.2'
7574

7675
// Forcing logback to be used for test logging
77-
testImplementation "ch.qos.logback:logback-classic:1.2.11"
76+
testImplementation "ch.qos.logback:logback-classic:1.3.5"
7877
testImplementation "org.slf4j:jcl-over-slf4j:1.7.36"
7978

8079
documentation files('LICENSE.txt')
@@ -184,7 +183,17 @@ task connectorArchive_CopyDocumentationToBuildDirectory(type: Copy, group: confl
184183
task connectorArchive_CopyDependenciesToBuildDirectory(type: Copy, group: confluentArchiveGroup, dependsOn: jar) {
185184
description = "Copy the dependency jars into the lib folder"
186185
from jar
187-
from configurations.runtimeClasspath.findAll { it.name.endsWith('jar') }
186+
// Confluent already includes the Jackson dependencies that this connector depends on. If the connector includes any
187+
// itself, and the DHF integration is used with the sink connector, then the following error will occur when DHF
188+
// tries to connect to the Manage API of MarkLogic:
189+
// java.lang.ClassCastException: com.fasterxml.jackson.datatype.jdk8.Jdk8Module cannot be cast to com.fasterxml.jackson.databind.Module
190+
// at org.springframework.http.converter.json.Jackson2ObjectMapperBuilder.registerWellKnownModulesIfAvailable(Jackson2ObjectMapperBuilder.java:849)
191+
// stackoverflow indicates this may be due to multiple copies of Jackson being on the classpath, as Jdk8Module
192+
// otherwise should be castable to Module.
193+
// Testing has verified that excluding all "jackson-" jars still results in the connector working properly with
194+
// Confluent 7.3.1. This has no impact on using the connector with plain Apache Kafka which does not rely on
195+
// constructing this connector archive.
196+
from configurations.runtimeClasspath.findAll { it.name.endsWith('jar') && !it.name.startsWith("jackson-")}
188197
into "${baseArchiveBuildDir}/${baseArchiveName}/lib"
189198
}
190199

config/marklogic-sink.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ tasks.max=1
1313
topics=marklogic
1414

1515

16-
# Properties defined by the MarkLogic Kafka connector
16+
# Properties defined by the MarkLogic Kafka connector; change these as needed to fit your environment.
1717

1818
# Required; a MarkLogic host to connect to. By default, the connector uses the Data Movement SDK, and thus it will
1919
# connect to each of the hosts in a cluster.

config/marklogic-source.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,23 @@ name=marklogic-source
77
connector.class=com.marklogic.kafka.connect.source.MarkLogicSourceConnector
88

99

10-
# Properties defined by the MarkLogic Kafka connector
10+
# Properties defined by the MarkLogic Kafka connector; change these as needed to fit your environment.
1111

1212
# Required; a MarkLogic host to connect to. By default, the connector uses the Data Movement SDK, and thus it will
1313
# connect to each of the hosts in a cluster.
1414
ml.connection.host=localhost
1515

1616
# Required; the port of a REST API app server to connect to; if using Bulk Data Services, can be a plain HTTP app server
17-
ml.connection.port=8000
17+
ml.connection.port=8018
1818

1919
# Required; the authentication scheme used by the server defined by ml.connection.port; either 'DIGEST', 'BASIC', 'CERTIFICATE', 'KERBEROS', or 'NONE'
2020
ml.connection.securityContextType=DIGEST
2121

2222
# MarkLogic username for 'DIGEST' and 'BASIC' authentication
23-
ml.connection.username=
23+
ml.connection.username=kafka-test-user
2424

2525
# MarkLogic password for 'DIGEST' and 'BASIC' authentication
26-
ml.connection.password=
26+
ml.connection.password=kafkatest
2727

2828
# Path to PKCS12 file for 'CERTIFICATE' authentication
2929
# ml.connection.certFile=

0 commit comments

Comments
 (0)