diff --git a/src/main/java/com/mongodb/spark/sql/connector/config/ReadConfig.java b/src/main/java/com/mongodb/spark/sql/connector/config/ReadConfig.java index 2c7d6a01..86e47ee9 100644 --- a/src/main/java/com/mongodb/spark/sql/connector/config/ReadConfig.java +++ b/src/main/java/com/mongodb/spark/sql/connector/config/ReadConfig.java @@ -23,6 +23,7 @@ import static java.util.Collections.unmodifiableList; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.spark.sql.connector.exceptions.ConfigException; import com.mongodb.spark.sql.connector.read.partitioner.Partitioner; import java.util.HashMap; @@ -288,6 +289,34 @@ static ParseMode fromString(final String userParseMode) { private static final String STREAM_LOOKUP_FULL_DOCUMENT_DEFAULT = FullDocument.DEFAULT.getValue(); + /** + * Streaming full document before change configuration. + * + *

Determines what to return as the pre-image of the document during replace, update, or delete operations + * when using a MongoDB Change Stream. + * + *

Only applies if the MongoDB server is configured to capture pre-images. + * See: + * Change streams lookup full document before change for further details. + * + *

Possible values: + *

+ * + *

Configuration: {@value} + * + *

Default: "default" – the server's default behavior for the fullDocumentBeforeChange field. + */ + public static final String STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG = + "change.stream.lookup.full.document.before.change"; + + private static final String STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_DEFAULT = + FullDocumentBeforeChange.DEFAULT.getValue(); + enum StreamingStartupMode { LATEST, TIMESTAMP; @@ -492,6 +521,16 @@ public FullDocument getStreamFullDocument() { } } + /** @return the stream full document before change configuration or 'default' if not set. */ + public FullDocumentBeforeChange getStreamFullDocumentBeforeChange() { + try { + return FullDocumentBeforeChange.fromString( + getOrDefault(STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_DEFAULT)); + } catch (IllegalArgumentException e) { + throw new ConfigException(e); + } + } + /** @return true if should drop any malformed rows */ public boolean dropMalformed() { return parseMode == ParseMode.DROPMALFORMED; diff --git a/src/main/java/com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.java b/src/main/java/com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.java index 0ac677ae..9381fc1c 100644 --- a/src/main/java/com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.java +++ b/src/main/java/com/mongodb/spark/sql/connector/read/MongoContinuousPartitionReader.java @@ -196,6 +196,7 @@ private MongoChangeStreamCursor getCursor() { } changeStreamIterable .fullDocument(readConfig.getStreamFullDocument()) + .fullDocumentBeforeChange(readConfig.getStreamFullDocumentBeforeChange()) .comment(readConfig.getComment()); changeStreamIterable = lastOffset.applyToChangeStreamIterable(changeStreamIterable); diff --git a/src/main/java/com/mongodb/spark/sql/connector/read/MongoMicroBatchPartitionReader.java b/src/main/java/com/mongodb/spark/sql/connector/read/MongoMicroBatchPartitionReader.java index e7f7467d..1681fc95 100644 --- a/src/main/java/com/mongodb/spark/sql/connector/read/MongoMicroBatchPartitionReader.java +++ b/src/main/java/com/mongodb/spark/sql/connector/read/MongoMicroBatchPartitionReader.java @@ -185,6 +185,7 @@ private MongoChangeStreamCursor getCursor() { } changeStreamIterable .fullDocument(readConfig.getStreamFullDocument()) + .fullDocumentBeforeChange(readConfig.getStreamFullDocumentBeforeChange()) .comment(readConfig.getComment()); if (partition.getStartOffsetTimestamp().getTime() >= 0) { changeStreamIterable.startAtOperationTime(partition.getStartOffsetTimestamp()); diff --git a/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java b/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java index 22e8ed20..937b9524 100644 --- a/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java +++ b/src/test/java/com/mongodb/spark/sql/connector/config/MongoConfigTest.java @@ -28,6 +28,7 @@ import com.mongodb.WriteConcern; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.spark.sql.connector.exceptions.ConfigException; import java.util.HashMap; import java.util.Map; @@ -309,6 +310,29 @@ void testReadConfigStreamFullDocument() { assertEquals(readConfig.getStreamFullDocument(), FullDocument.UPDATE_LOOKUP); } + @Test + void testReadConfigStreamFullDocumentBeforeChange() { + ReadConfig readConfig = MongoConfig.readConfig(CONFIG_MAP); + assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.DEFAULT); + + readConfig = + readConfig.withOption(ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "off"); + assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.OFF); + + readConfig = readConfig.withOption( + ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "whenAvailable"); + assertEquals( + readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.WHEN_AVAILABLE); + + readConfig = readConfig.withOption( + ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "required"); + assertEquals(readConfig.getStreamFullDocumentBeforeChange(), FullDocumentBeforeChange.REQUIRED); + + readConfig = readConfig.withOption( + ReadConfig.STREAM_LOOKUP_FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, "INVALID"); + assertThrows(ConfigException.class, readConfig::getStreamFullDocumentBeforeChange); + } + @Test void testReadConfigSchemaHints() { ReadConfig readConfig =