From 33d621e6585758420b6ece9b32c77c20bb97d382 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 30 Jan 2025 10:04:14 -0800 Subject: [PATCH 1/7] Bulk load cdk: remove redundant app-test.yaml (#52658) --- .../testFixtures/resources/application-test.yaml | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/resources/application-test.yaml diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/resources/application-test.yaml b/airbyte-cdk/bulk/core/load/src/testFixtures/resources/application-test.yaml deleted file mode 100644 index 11c3b6bfaff4f..0000000000000 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/resources/application-test.yaml +++ /dev/null @@ -1,13 +0,0 @@ -airbyte: - destination: - core: - record-batch-size-override: 1 # 1 byte for testing; 1 record => 1 upload - file-transfer: - enabled: ${USE_FILE_TRANSFER:false} - staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files} - resources: - disk: - bytes: ${CONNECTOR_STORAGE_LIMIT_BYTES:5368709120} # 5GB - flush: - rate-ms: 900000 # 15 minutes - window-ms: 900000 # 15 minutes From bf8ee6d6667ae77e71af507bacf7b24be2735d4a Mon Sep 17 00:00:00 2001 From: "Ryan Br..." Date: Thu, 30 Jan 2025 10:08:29 -0800 Subject: [PATCH 2/7] pin cdk (0.296) and cut rc17. (#52642) --- airbyte-integrations/connectors/destination-s3/build.gradle | 2 +- airbyte-integrations/connectors/destination-s3/metadata.yaml | 2 +- docs/integrations/destinations/s3.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index dc91cb3f4b368..1f26ebb2331d8 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteBulkConnector { core = 'load' toolkits = ['load-s3', 'load-avro', 'load-aws'] - cdk = 'local' + cdk = '0.296' } application { diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 0632ee134f2f2..73b0a154acba8 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.0-rc.16 + dockerImageTag: 1.5.0-rc.17 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 88a5a64b79681..164528a40458f 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.5.0-rc.17 | 2025-01-29 | [52610](https://github.com/airbytehq/airbyte/pull/52642) | Pin CDK 0.296 | | 1.5.0-rc.16 | 2025-01-29 | [52610](https://github.com/airbytehq/airbyte/pull/52610) | Fix assume role behavior | | 1.5.0-rc.15 | 2025-01-23 | [52103](https://github.com/airbytehq/airbyte/pull/52103) | Make the connector use our non root base image. | | 1.5.0-rc.14 | 2025-01-24 | [51600](https://github.com/airbytehq/airbyte/pull/51600) | Internal refactor | From 7c28d4ede115041f6cb2cdf8c99ff2d35df39ae5 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Thu, 30 Jan 2025 13:41:35 -0500 Subject: [PATCH 3/7] bulk-cdk-toolkit-extract-cdc: API changes (#52040) --- .../cdk/read/cdc/CdcPartitionReader.kt | 35 +-- .../cdk/read/cdc/CdcPartitionsCreator.kt | 92 +++++--- .../read/cdc/CdcPartitionsCreatorFactory.kt | 4 + .../io/airbyte/cdk/read/cdc/Debezium.kt | 23 +- .../cdk/read/cdc/DebeziumOperations.kt | 25 ++- .../read/cdc/CdcPartitionReaderMongoTest.kt | 200 +++++++++--------- .../read/cdc/CdcPartitionReaderMySQLTest.kt | 152 +++++++------ .../cdc/CdcPartitionReaderPostgresTest.kt | 132 ++++++------ .../cdk/read/cdc/CdcPartitionsCreatorTest.kt | 50 ++--- .../cdc/AbstractCdcPartitionReaderTest.kt | 74 ++++--- 10 files changed, 402 insertions(+), 385 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt index 7c0c8661fbbc8..4e3dbc1ccba83 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.read.cdc import io.airbyte.cdk.StreamIdentifier +import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.ConcurrencyResource import io.airbyte.cdk.read.PartitionReadCheckpoint import io.airbyte.cdk.read.PartitionReader @@ -15,7 +16,7 @@ import io.debezium.engine.ChangeEvent import io.debezium.engine.DebeziumEngine import io.debezium.engine.format.Json import io.github.oshai.kotlinlogging.KotlinLogging -import java.util.* +import java.util.Properties import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer @@ -31,12 +32,15 @@ class CdcPartitionReader>( val streamRecordConsumers: Map, val readerOps: CdcPartitionReaderDebeziumOperations, val upperBound: T, - val input: DebeziumInput, + val debeziumProperties: Map, + val startingOffset: DebeziumOffset, + val startingSchemaHistory: DebeziumSchemaHistory?, + val isInputStateSynthetic: Boolean, ) : UnlimitedTimePartitionReader { private val log = KotlinLogging.logger {} private val acquiredThread = AtomicReference() private lateinit var stateFilesAccessor: DebeziumStateFilesAccessor - private lateinit var properties: Properties + private lateinit var decoratedProperties: Properties private lateinit var engine: DebeziumEngine> internal val closeReasonReference = AtomicReference() @@ -64,19 +68,19 @@ class CdcPartitionReader>( } override suspend fun run() { - stateFilesAccessor.writeOffset(input.state.offset) - if (input.state.schemaHistory != null) { - stateFilesAccessor.writeSchema(input.state.schemaHistory) + stateFilesAccessor.writeOffset(startingOffset) + if (startingSchemaHistory != null) { + stateFilesAccessor.writeSchema(startingSchemaHistory) } - properties = + decoratedProperties = DebeziumPropertiesBuilder() - .with(input.properties) + .with(debeziumProperties) .withOffsetFile(stateFilesAccessor.offsetFilePath) .withSchemaHistoryFile(stateFilesAccessor.schemaFilePath) .build() engine = DebeziumEngine.create(Json::class.java) - .using(properties) + .using(decoratedProperties) .using(ConnectorCallback()) .using(CompletionCallback()) .notifying(EventConsumer(coroutineContext)) @@ -116,15 +120,15 @@ class CdcPartitionReader>( } override fun checkpoint(): PartitionReadCheckpoint { - val offset: DebeziumOffset = stateFilesAccessor.readUpdatedOffset(input.state.offset) + val offset: DebeziumOffset = stateFilesAccessor.readUpdatedOffset(startingOffset) val schemaHistory: DebeziumSchemaHistory? = - if (DebeziumPropertiesBuilder().with(properties).expectsSchemaHistoryFile) { + if (DebeziumPropertiesBuilder().with(decoratedProperties).expectsSchemaHistoryFile) { stateFilesAccessor.readSchema() } else { null } - val output = DebeziumState(offset, schemaHistory) - return PartitionReadCheckpoint(readerOps.serialize(output), numEmittedRecords.get()) + val serializedState: OpaqueStateValue = readerOps.serializeState(offset, schemaHistory) + return PartitionReadCheckpoint(serializedState, numEmittedRecords.get()) } inner class EventConsumer( @@ -173,7 +177,7 @@ class CdcPartitionReader>( // Ignore events which can't be mapped to a stream. ?: return EventType.RECORD_DISCARDED_BY_STREAM_ID val deserializedRecord: DeserializedRecord = - readerOps.deserialize(event.key, event.value, streamRecordConsumer.stream) + readerOps.deserializeRecord(event.key, event.value, streamRecordConsumer.stream) // Ignore events which can't be deserialized into records. ?: return EventType.RECORD_DISCARDED_BY_DESERIALIZE // Emit the record at the end of the happy path. @@ -209,7 +213,7 @@ class CdcPartitionReader>( } private fun findCloseReason(event: DebeziumEvent, eventType: EventType): CloseReason? { - if (input.isSynthetic && eventType != EventType.HEARTBEAT) { + if (isInputStateSynthetic && eventType != EventType.HEARTBEAT) { // Special case where the engine started with a synthetic offset: // don't even consider closing the engine unless handling a heartbeat event. // For some databases, such as Oracle, Debezium actually needs to snapshot the @@ -298,7 +302,6 @@ class CdcPartitionReader>( } enum class CloseReason(val message: String) { - TIMEOUT("timed out"), HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION( "heartbeat or tombstone indicates that WAL consumption has reached the target position" ), diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt index 6ae589cb64990..b83785e8321ef 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt @@ -22,12 +22,11 @@ class CdcPartitionsCreator>( val readerOps: CdcPartitionReaderDebeziumOperations, val lowerBoundReference: AtomicReference, val upperBoundReference: AtomicReference, + val resetReason: AtomicReference, ) : PartitionsCreator { private val log = KotlinLogging.logger {} private val acquiredThread = AtomicReference() - class OffsetInvalidNeedsResyncIllegalStateException() : IllegalStateException() - override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus { val acquiredThread: ConcurrencyResource.AcquiredThread = concurrencyResource.tryAcquire() @@ -41,42 +40,68 @@ class CdcPartitionsCreator>( } override suspend fun run(): List { - if (CDCNeedsRestart) { + resetReason.get()?.let { reason: String -> throw TransientErrorException( - "Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch." + "Triggering reset. Incumbent CDC state is invalid, reason: ${reason}." ) } val activeStreams: List by lazy { feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null } } - val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize() } + val syntheticOffset: DebeziumOffset by lazy { creatorOps.generateColdStartOffset() } // Ensure that the WAL position upper bound has been computed for this sync. val upperBound: T = - upperBoundReference.updateAndGet { - it ?: creatorOps.position(syntheticInput.state.offset) - } + upperBoundReference.updateAndGet { it ?: creatorOps.position(syntheticOffset) } // Deserialize the incumbent state value, if it exists. - val input: DebeziumInput = - when (val incumbentOpaqueStateValue = feedBootstrap.currentState) { - null -> syntheticInput - else -> { - // validate if existing state is still valid on DB. - try { - creatorOps.deserialize(incumbentOpaqueStateValue, activeStreams) - } catch (ex: ConfigErrorException) { - log.error(ex) { "Existing state is invalid." } - throw ex - } catch (_: OffsetInvalidNeedsResyncIllegalStateException) { - // If deserialization concludes we need a re-sync we rollback stream states - // and put the creator in a Need Restart mode. - // The next round will throw a transient error to kickoff the resync - feedBootstrap.stateQuerier.resetFeedStates() - CDCNeedsRestart = true - syntheticInput - } + val warmStartState: DebeziumWarmStartState? = + feedBootstrap.currentState?.let { + try { + creatorOps.deserializeState(it) + } catch (e: Exception) { + // This catch should be redundant for well-behaved implementations + // but is included anyway for safety. + AbortDebeziumWarmStartState(e.toString()) } } - + val debeziumProperties: Map + val startingOffset: DebeziumOffset + val startingSchemaHistory: DebeziumSchemaHistory? + when (warmStartState) { + null -> { + debeziumProperties = creatorOps.generateColdStartProperties() + startingOffset = syntheticOffset + startingSchemaHistory = null + } + is ValidDebeziumWarmStartState -> { + debeziumProperties = creatorOps.generateWarmStartProperties(activeStreams) + startingOffset = warmStartState.offset + startingSchemaHistory = warmStartState.schemaHistory + } + is AbortDebeziumWarmStartState -> { + val e = + ConfigErrorException( + "Incumbent CDC state is invalid, reason: ${warmStartState.reason}" + ) + log.error(e) { "Aborting. ${e.message}." } + throw e + } + is ResetDebeziumWarmStartState -> { + // The incumbent CDC state value is invalid and the sync needs to be reset. + // Doing so is not so straightforward as throwing a TransientErrorException because + // a STATE message with a post-reset state needs to be emitted first. + // This new state is obtained by zeroing all corresponding feeds in the StateManager + // and returning a CdcPartitionReader for a cold start with a synthetic offset. + // This CdcPartitionReader will run, after which the desired STATE message will be + // emitted, and finally the next CdcPartitionsCreator will throw a + // TransientErrorException. The next sync will then snapshot the tables. + resetReason.set(warmStartState.reason) + log.info { "Resetting invalid incumbent CDC state with synthetic state." } + feedBootstrap.stateQuerier.resetFeedStates() + debeziumProperties = creatorOps.generateColdStartProperties() + startingOffset = syntheticOffset + startingSchemaHistory = null + } + } // Build and return PartitionReader instance, if applicable. val partitionReader = CdcPartitionReader( @@ -84,11 +109,14 @@ class CdcPartitionsCreator>( feedBootstrap.streamRecordConsumers(), readerOps, upperBound, - input + debeziumProperties, + startingOffset, + startingSchemaHistory, + warmStartState !is ValidDebeziumWarmStartState, ) - val lowerBound: T = creatorOps.position(input.state.offset) + val lowerBound: T = creatorOps.position(startingOffset) val lowerBoundInPreviousRound: T? = lowerBoundReference.getAndSet(lowerBound) - if (input.isSynthetic) { + if (partitionReader.isInputStateSynthetic) { // Handle synthetic offset edge-case, which always needs to run. // Debezium needs to run to generate the full state, which might include schema history. log.info { "Current offset is synthetic." } @@ -113,8 +141,4 @@ class CdcPartitionsCreator>( log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." } return listOf(partitionReader) } - - companion object { - var CDCNeedsRestart: Boolean = false - } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt index 6a188c02575a8..8e9357e3d8480 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt @@ -34,6 +34,9 @@ class CdcPartitionsCreatorFactory>( */ private val upperBoundReference = AtomicReference() + /** [AtomicReference] used to trigger resetting a sync when not null. */ + private val resetReason = AtomicReference(null) + override fun make(feedBootstrap: FeedBootstrap<*>): PartitionsCreator? { if (feedBootstrap !is GlobalFeedBootstrap) { // Fall through on non-Global streams. @@ -46,6 +49,7 @@ class CdcPartitionsCreatorFactory>( debeziumOps, lowerBoundReference, upperBoundReference, + resetReason, ) } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt index f73c7522e4ee5..842df22e13dea 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt @@ -85,18 +85,19 @@ value class DebeziumRecordValue(val wrapped: JsonNode) { } } -/** Debezium Engine input. */ -data class DebeziumInput( - val properties: Map, - val state: DebeziumState, - val isSynthetic: Boolean, -) - -/** Debezium Engine output, other than records of course. */ -data class DebeziumState( +/** Return type for [CdcPartitionsCreatorDebeziumOperations.deserializeState]. */ +sealed interface DebeziumWarmStartState + +data class ValidDebeziumWarmStartState( val offset: DebeziumOffset, - val schemaHistory: DebeziumSchemaHistory?, -) + val schemaHistory: DebeziumSchemaHistory? +) : DebeziumWarmStartState + +sealed interface InvalidDebeziumWarmStartState : DebeziumWarmStartState + +data class AbortDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState + +data class ResetDebeziumWarmStartState(val reason: String) : InvalidDebeziumWarmStartState /** [DebeziumOffset] wraps the contents of a Debezium offset file. */ @JvmInline value class DebeziumOffset(val wrapped: Map) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt index c425f5fd6d967..e789d59e674bf 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt @@ -20,11 +20,19 @@ interface CdcPartitionsCreatorDebeziumOperations> { /** Extracts the WAL position from a [DebeziumOffset]. */ fun position(offset: DebeziumOffset): T - /** Synthesizes a [DebeziumInput] when no incumbent [OpaqueStateValue] is available. */ - fun synthesize(): DebeziumInput + /** + * Synthesizes a [DebeziumColdStartingState] when no incumbent [OpaqueStateValue] is available. + */ + fun generateColdStartOffset(): DebeziumOffset + + /** Generates Debezium properties for use with a [DebeziumColdStartingState]. */ + fun generateColdStartProperties(): Map + + /** Maps an incumbent [OpaqueStateValue] into a [DebeziumWarmStartState]. */ + fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState - /** Builds a [DebeziumInput] using an incumbent [OpaqueStateValue]. */ - fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput + /** Generates Debezium properties for use with a [ValidDebeziumWarmStartState]. */ + fun generateWarmStartProperties(streams: List): Map } interface CdcPartitionReaderDebeziumOperations> { @@ -34,7 +42,7 @@ interface CdcPartitionReaderDebeziumOperations> { * * Returning null means that the event should be treated like a heartbeat. */ - fun deserialize( + fun deserializeRecord( key: DebeziumRecordKey, value: DebeziumRecordValue, stream: Stream, @@ -52,8 +60,11 @@ interface CdcPartitionReaderDebeziumOperations> { value: DebeziumRecordValue, ): String? - /** Maps a [DebeziumState] to an [OpaqueStateValue]. */ - fun serialize(debeziumState: DebeziumState): OpaqueStateValue + /** Maps a Debezium state to an [OpaqueStateValue]. */ + fun serializeState( + offset: DebeziumOffset, + schemaHistory: DebeziumSchemaHistory? + ): OpaqueStateValue /** Tries to extract the WAL position from a [DebeziumRecordValue]. */ fun position(recordValue: DebeziumRecordValue): T? diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt index d37acf278ab57..4dd13fc254b7a 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt @@ -14,7 +14,6 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons import io.debezium.connector.mongodb.MongoDbConnector @@ -81,119 +80,110 @@ class CdcPartitionReaderMongoTest : fn(it.getCollection(stream.name)) } - override fun createDebeziumOperations(): DebeziumOperations { - return object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { - val resumeToken: String = - recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() - ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) - } + override fun createDebeziumOperations(): DebeziumOperations = + MongoTestDebeziumOperations() - override fun position(sourceRecord: SourceRecord): BsonTimestamp? { - val offset: Map = sourceRecord.sourceOffset() - val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) - } + inner class MongoTestDebeziumOperations : AbstractDebeziumOperationsForTest() { - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(offset: DebeziumOffset): BsonTimestamp { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) + } - override fun deserialize( - key: DebeziumRecordKey, - value: DebeziumRecordValue, - stream: Stream, - ): DeserializedRecord { - val id: Int = key.element("id").asInt() - val record: Record = - if (value.operation == "d") { - Delete(id) - } else { - val v: Int? = - value.after - .takeIf { it.isTextual } - ?.asText() - ?.let { Jsons.readTree(it)["v"] } - ?.asInt() - if (v == null) { - // In case a mongodb document was updated and then deleted, the update - // change - // event will not have any information ({after: null}) - // We are going to treat it as a Delete. - Delete(id) - } else if (value.operation == "u") { - Update(id, v) - } else { - Insert(id, v) - } - } - return DeserializedRecord( - data = Jsons.valueToTree(record), - changes = emptyMap(), - ) - } + override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { + val resumeToken: String = + recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) + } - override fun position(offset: DebeziumOffset): BsonTimestamp { - val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) - } + override fun position(sourceRecord: SourceRecord): BsonTimestamp? { + val offset: Map = sourceRecord.sourceOffset() + val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) + } - override fun synthesize(): DebeziumInput { - val resumeToken: BsonDocument = currentResumeToken() - val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) - val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value - val key: ArrayNode = - Jsons.arrayNode().apply { - add(stream.namespace) - add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ord", timestamp.inc) - put("sec", timestamp.time) - put("resume_token", resumeTokenString) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) + override fun generateColdStartOffset(): DebeziumOffset { + val resumeToken: BsonDocument = currentResumeToken() + val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) + val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value + val key: ArrayNode = + Jsons.arrayNode().apply { + add(stream.namespace) + add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ord", timestamp.inc) + put("sec", timestamp.time) + put("resume_token", resumeTokenString) + } + return DebeziumOffset(mapOf(key to value)) + } + + override fun generateColdStartProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MongoDbConnector::class.java) + .withDebeziumName(stream.namespace!!) + .withHeartbeats(heartbeat) + .with("capture.scope", "database") + .with("capture.target", stream.namespace!!) + .with("mongodb.connection.string", container.connectionString) + .with("snapshot.mode", "no_data") + .with( + "collection.include.list", + DebeziumPropertiesBuilder.joinIncludeList( + listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) + ) + ) + .with("database.include.list", stream.namespace!!) + .withOffset() + .buildMap() + + override fun generateWarmStartProperties(streams: List): Map = + generateColdStartProperties() + + fun currentResumeToken(): BsonDocument = + container.withMongoDatabase { mongoDatabase: MongoDatabase -> + val pipeline = listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) + mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { + it.tryNext() + it.resumeToken!! + } } - fun currentResumeToken(): BsonDocument = - container.withMongoDatabase { mongoDatabase: MongoDatabase -> - val pipeline = - listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) - mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { - it.tryNext() - it.resumeToken!! + override fun deserializeRecord( + key: DebeziumRecordKey, + value: DebeziumRecordValue, + stream: Stream + ): DeserializedRecord { + val id: Int = key.element("id").asInt() + val record: Record = + if (value.operation == "d") { + Delete(id) + } else { + val v: Int? = + value.after + .takeIf { it.isTextual } + ?.asText() + ?.let { Jsons.readTree(it)["v"] } + ?.asInt() + if (v == null) { + // In case a mongodb document was updated and then deleted, the update + // change + // event will not have any information ({after: null}) + // We are going to treat it as a Delete. + Delete(id) + } else if (value.operation == "u") { + Update(id, v) + } else { + Insert(id, v) } } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MongoDbConnector::class.java) - .withDebeziumName(stream.namespace!!) - .withHeartbeats(heartbeat) - .with("capture.scope", "database") - .with("capture.target", stream.namespace!!) - .with("mongodb.connection.string", container.connectionString) - .with("snapshot.mode", "no_data") - .with( - "collection.include.list", - DebeziumPropertiesBuilder.joinIncludeList( - listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) - ) - ) - .with("database.include.list", stream.namespace!!) - .withOffset() - .buildMap() + return DeserializedRecord( + data = Jsons.valueToTree(record), + changes = emptyMap(), + ) } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt index 263f48198555f..7171e64079d54 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt @@ -6,9 +6,7 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream -import io.airbyte.cdk.read.cdc.* import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons import io.debezium.connector.mysql.MySqlConnector @@ -73,90 +71,82 @@ class CdcPartitionReaderMySQLTest : } override fun createDebeziumOperations(): DebeziumOperations = - object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(offset: DebeziumOffset): Position { - val offsetAsJson = offset.wrapped.values.first() - val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) - return retVal - } + MySQLTestDebeziumOperations() - override fun position(recordValue: DebeziumRecordValue): Position? { - val file: String = - recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null - val pos: Long = - recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() - ?: return null - return Position(file, pos) - } + inner class MySQLTestDebeziumOperations : AbstractDebeziumOperationsForTest() { - override fun position(sourceRecord: SourceRecord): Position? { - val offset: Map = sourceRecord.sourceOffset() - val file: String = offset["file"]?.toString() ?: return null - val pos: Long = offset["pos"] as? Long ?: return null - return Position(file, pos) - } + override fun position(offset: DebeziumOffset): Position { + val offsetAsJson = offset.wrapped.values.first() + val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) + return retVal + } - override fun synthesize(): DebeziumInput { - val position: Position = currentPosition() - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(container.databaseName) - add(Jsons.objectNode().apply { put("server", container.databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_sec", timestamp.epochSecond) - put("file", position.file) - put("pos", position.pos) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = - DebeziumPropertiesBuilder() - .with(debeziumProperties()) - .with("snapshot.mode", "recovery") - .withStreams(listOf()) - .buildMap() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } + override fun position(recordValue: DebeziumRecordValue): Position? { + val file: String = + recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null + val pos: Long = + recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null + return Position(file, pos) + } - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(sourceRecord: SourceRecord): Position? { + val offset: Map = sourceRecord.sourceOffset() + val file: String = offset["file"]?.toString() ?: return null + val pos: Long = offset["pos"] as? Long ?: return null + return Position(file, pos) + } - fun currentPosition(): Position = - container.withStatement { statement: Statement -> - statement.executeQuery("SHOW MASTER STATUS").use { - it.next() - Position(it.getString("File"), it.getLong("Position")) - } + override fun generateColdStartOffset(): DebeziumOffset { + val position: Position = currentPosition() + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MySqlConnector::class.java) - .withDebeziumName(container.databaseName) - .withHeartbeats(heartbeat) - .with("include.schema.changes", "false") - .with("connect.keep.alive.interval.ms", "1000") - .withDatabase("hostname", container.host) - .withDatabase("port", container.firstMappedPort.toString()) - .withDatabase("user", container.username) - .withDatabase("password", container.password) - .withDatabase("dbname", container.databaseName) - .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) - .withDatabase("include.list", container.databaseName) - .withOffset() - .withSchemaHistory() - .with("snapshot.mode", "when_needed") - .withStreams(listOf(stream)) - .buildMap() + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_sec", timestamp.epochSecond) + put("file", position.file) + put("pos", position.pos) + } + return DebeziumOffset(mapOf(key to value)) } + + override fun generateColdStartProperties(): Map = + DebeziumPropertiesBuilder() + .with(generateWarmStartProperties(emptyList())) + .with("snapshot.mode", "recovery") + .withStreams(listOf()) + .buildMap() + + override fun generateWarmStartProperties(streams: List): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MySqlConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("include.schema.changes", "false") + .with("connect.keep.alive.interval.ms", "1000") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) + .withDatabase("include.list", container.databaseName) + .withOffset() + .withSchemaHistory() + .with("snapshot.mode", "when_needed") + .withStreams(streams) + .buildMap() + + fun currentPosition(): Position = + container.withStatement { statement: Statement -> + statement.executeQuery("SHOW MASTER STATUS").use { + it.next() + Position(it.getString("File"), it.getLong("Position")) + } + } + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt index 97a31616b6b6f..1d6571a4da697 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons @@ -74,81 +73,72 @@ class CdcPartitionReaderPostgresTest : connection.createStatement().use { fn(it) } } - override fun createDebeziumOperations(): DebeziumOperations { - return object : - AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(offset: DebeziumOffset): LogSequenceNumber { - val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) - } + override fun createDebeziumOperations(): DebeziumOperations = + PostgresTestDebeziumOperations() - override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { - val lsn: Long = - recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() - ?: return null - return LogSequenceNumber.valueOf(lsn) - } + inner class PostgresTestDebeziumOperations : + AbstractDebeziumOperationsForTest() { + override fun position(offset: DebeziumOffset): LogSequenceNumber { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) + } - override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { - val offset: Map = sourceRecord.sourceOffset() - val lsn: Long = offset["lsn"] as? Long ?: return null - return LogSequenceNumber.valueOf(lsn) - } + override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { + val lsn: Long = + recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null + return LogSequenceNumber.valueOf(lsn) + } - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { + val offset: Map = sourceRecord.sourceOffset() + val lsn: Long = offset["lsn"] as? Long ?: return null + return LogSequenceNumber.valueOf(lsn) + } - override fun synthesize(): DebeziumInput { - val (position: LogSequenceNumber, txID: Long) = - container.withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) - } + override fun generateWarmStartProperties(streams: List): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(PostgresConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("plugin.name", "pgoutput") + .with("slot.name", SLOT_NAME) + .with("publication.name", PUBLICATION_NAME) + .with("publication.autocreate.mode", "disabled") + .with("flush.lsn.source", "false") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withOffset() + .withStreams(streams) + .buildMap() + + override fun generateColdStartProperties(): Map = + generateWarmStartProperties(emptyList()) + + override fun generateColdStartOffset(): DebeziumOffset { + val (position: LogSequenceNumber, txID: Long) = + container.withStatement { statement: Statement -> + statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { + it.next() + LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) } - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(container.databaseName) - add(Jsons.objectNode().apply { put("server", container.databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_usec", timestamp.toEpochMilli() * 1000L) - put("lsn", position.asLong()) - put("txId", txID) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(PostgresConnector::class.java) - .withDebeziumName(container.databaseName) - .withHeartbeats(heartbeat) - .with("plugin.name", "pgoutput") - .with("slot.name", SLOT_NAME) - .with("publication.name", PUBLICATION_NAME) - .with("publication.autocreate.mode", "disabled") - .with("flush.lsn.source", "false") - .withDatabase("hostname", container.host) - .withDatabase("port", container.firstMappedPort.toString()) - .withDatabase("user", container.username) - .withDatabase("password", container.password) - .withDatabase("dbname", container.databaseName) - .withOffset() - .withStreams(listOf(stream)) - .buildMap() + } + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_usec", timestamp.toEpochMilli() * 1000L) + put("lsn", position.asLong()) + put("txId", txID) + } + return DebeziumOffset(mapOf(key to value)) } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt index d778aafc5a7d0..1e62bb66aca76 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt @@ -58,6 +58,7 @@ class CdcPartitionsCreatorTest { val lowerBoundReference = AtomicReference(null) val upperBoundReference = AtomicReference(null) + val reset = AtomicReference(null) val creator: CdcPartitionsCreator get() = @@ -68,16 +69,11 @@ class CdcPartitionsCreatorTest { readerOps, lowerBoundReference, upperBoundReference, + reset, ) val syntheticOffset = DebeziumOffset(mapOf(Jsons.objectNode() to Jsons.objectNode())) val incumbentOffset = DebeziumOffset(mapOf(Jsons.objectNode() to Jsons.objectNode())) - val syntheticInput = - DebeziumInput( - properties = emptyMap(), - state = DebeziumState(offset = syntheticOffset, schemaHistory = null), - isSynthetic = true, - ) @BeforeEach fun setup() { @@ -87,7 +83,9 @@ class CdcPartitionsCreatorTest { every { stateQuerier.feeds } returns listOf(global, stream) every { creatorOps.position(syntheticOffset) } returns 123L every { creatorOps.position(incumbentOffset) } returns 123L - every { creatorOps.synthesize() } returns syntheticInput + every { creatorOps.generateColdStartOffset() } returns syntheticOffset + every { creatorOps.generateColdStartProperties() } returns emptyMap() + every { creatorOps.generateWarmStartProperties(listOf(stream)) } returns emptyMap() } @Test @@ -96,53 +94,37 @@ class CdcPartitionsCreatorTest { every { stateQuerier.current(stream) } returns null val syntheticOffset = DebeziumOffset(mapOf(Jsons.nullNode() to Jsons.nullNode())) every { creatorOps.position(syntheticOffset) } returns 123L - val syntheticInput = - DebeziumInput( - properties = emptyMap(), - state = DebeziumState(offset = syntheticOffset, schemaHistory = null), - isSynthetic = true, - ) - every { creatorOps.synthesize() } returns syntheticInput + every { creatorOps.generateColdStartOffset() } returns syntheticOffset upperBoundReference.set(null) val readers: List = runBlocking { creator.run() } Assertions.assertEquals(1, readers.size) val reader = readers.first() as CdcPartitionReader<*> Assertions.assertEquals(123L, reader.upperBound) - Assertions.assertEquals(syntheticInput, reader.input) + Assertions.assertEquals(syntheticOffset, reader.startingOffset) } @Test fun testCreateWithDeserializedOffset() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() every { stateQuerier.current(stream) } returns Jsons.objectNode() - val deserializedInput = - DebeziumInput( - properties = emptyMap(), - state = DebeziumState(offset = incumbentOffset, schemaHistory = null), - isSynthetic = false, - ) - every { creatorOps.deserialize(Jsons.objectNode(), listOf(stream)) } returns - deserializedInput + val deserializedState = + ValidDebeziumWarmStartState(offset = incumbentOffset, schemaHistory = null) + every { creatorOps.deserializeState(Jsons.objectNode()) } returns deserializedState upperBoundReference.set(1_000_000L) val readers: List = runBlocking { creator.run() } Assertions.assertEquals(1, readers.size) val reader = readers.first() as CdcPartitionReader<*> Assertions.assertEquals(1_000_000L, reader.upperBound) - Assertions.assertEquals(deserializedInput, reader.input) + Assertions.assertEquals(deserializedState.offset, reader.startingOffset) } @Test fun testCreateNothing() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() every { stateQuerier.current(stream) } returns Jsons.objectNode() - val deserializedInput = - DebeziumInput( - properties = emptyMap(), - state = DebeziumState(offset = incumbentOffset, schemaHistory = null), - isSynthetic = false, - ) - every { creatorOps.deserialize(Jsons.objectNode(), listOf(stream)) } returns - deserializedInput + val deserializedState = + ValidDebeziumWarmStartState(offset = incumbentOffset, schemaHistory = null) + every { creatorOps.deserializeState(Jsons.objectNode()) } returns deserializedState upperBoundReference.set(1L) val readers: List = runBlocking { creator.run() } Assertions.assertEquals(emptyList(), readers) @@ -152,8 +134,8 @@ class CdcPartitionsCreatorTest { fun testCreateWithFailedValidation() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() every { stateQuerier.current(stream) } returns Jsons.objectNode() - every { creatorOps.deserialize(Jsons.objectNode(), listOf(stream)) } throws - ConfigErrorException("invalid state value") + every { creatorOps.deserializeState(Jsons.objectNode()) } returns + AbortDebeziumWarmStartState("boom") assertThrows(ConfigErrorException::class.java) { runBlocking { creator.run() } } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt index 01bea3ef4f8ef..cbf5ae53c31d2 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt @@ -81,8 +81,15 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab */ fun integrationTest() { container.createStream() - val p0: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) - val r0: ReadResult = read(debeziumOperations.synthesize(), p0) + val i0 = + ReadInput( + debeziumOperations.generateColdStartProperties(), + debeziumOperations.generateColdStartOffset(), + schemaHistory = null, + isSynthetic = true, + ) + val p0: T = debeziumOperations.position(i0.offset) + val r0: ReadResult = read(i0, p0) Assertions.assertEquals(emptyList(), r0.records) Assertions.assertNotEquals( CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, @@ -105,22 +112,28 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Update(3, 7), Update(5, 8), ) - val p1: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + val p1: T = debeziumOperations.position(debeziumOperations.generateColdStartOffset()) container.delete24() val delete = listOf( Delete(2), Delete(4), ) - val p2: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + val p2: T = debeziumOperations.position(debeziumOperations.generateColdStartOffset()) - val input: DebeziumInput = - debeziumOperations.deserialize(debeziumOperations.serialize(r0.state), listOf(stream)) - val r1: ReadResult = read(input, p1) + Assertions.assertTrue(r0.state is ValidDebeziumWarmStartState) + val i1 = + ReadInput( + debeziumOperations.generateWarmStartProperties(listOf(stream)), + (r0.state as ValidDebeziumWarmStartState).offset, + r0.state.schemaHistory, + isSynthetic = false, + ) + val r1: ReadResult = read(i1, p1) Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) Assertions.assertNotNull(r1.closeReason) - val r2: ReadResult = read(input, p2) + val r2: ReadResult = read(i1, p2) Assertions.assertEquals( insert + update + delete, r2.records.take(insert.size + update.size + delete.size), @@ -133,7 +146,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab } private fun read( - input: DebeziumInput, + input: ReadInput, upperBound: T, ): ReadResult { val outputConsumer = BufferingOutputConsumer(ClockFactory().fixed()) @@ -162,7 +175,10 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab streamRecordConsumers, debeziumOperations, upperBound, - input, + input.properties, + input.offset, + input.schemaHistory, + input.isSynthetic, ) Assertions.assertEquals( PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN, @@ -196,14 +212,21 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Assertions.assertEquals(0, reader.numEventValuesWithoutPosition.get()) return ReadResult( outputConsumer.records().map { Jsons.treeToValue(it.data, Record::class.java) }, - debeziumOperations.deserialize(checkpoint.opaqueStateValue, listOf(stream)).state, + debeziumOperations.deserializeState(checkpoint.opaqueStateValue), reader.closeReasonReference.get(), ) } + data class ReadInput( + val properties: Map, + val offset: DebeziumOffset, + val schemaHistory: DebeziumSchemaHistory?, + val isSynthetic: Boolean, + ) + data class ReadResult( val records: List, - val state: DebeziumState, + val state: DebeziumWarmStartState, val closeReason: CdcPartitionReader.CloseReason?, ) @@ -220,13 +243,13 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab data class Update(override val id: Int, val v: Int) : Record data class Delete(override val id: Int) : Record - abstract inner class AbstractCdcPartitionReaderDebeziumOperationsForTest>( - val stream: Stream - ) : DebeziumOperations { - override fun deserialize( + abstract inner class AbstractDebeziumOperationsForTest> : + DebeziumOperations { + + override fun deserializeRecord( key: DebeziumRecordKey, value: DebeziumRecordValue, - stream: Stream, + stream: Stream ): DeserializedRecord { val id: Int = key.element("id").asInt() val after: Int? = value.after["v"]?.asInt() @@ -252,27 +275,27 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = stream.id.name - override fun serialize(debeziumState: DebeziumState): OpaqueStateValue = + override fun serializeState( + offset: DebeziumOffset, + schemaHistory: DebeziumSchemaHistory? + ): OpaqueStateValue = Jsons.valueToTree( mapOf( "offset" to - debeziumState.offset.wrapped + offset.wrapped .map { Jsons.writeValueAsString(it.key) to Jsons.writeValueAsString(it.value) } .toMap(), "schemaHistory" to - debeziumState.schemaHistory?.wrapped?.map { + schemaHistory?.wrapped?.map { DocumentWriter.defaultWriter().write(it.document()) }, ), ) - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { + override fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState { val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode val offset = DebeziumOffset( @@ -293,8 +316,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab } else { null } - val deserializedStateValue = DebeziumState(offset, schemaHistory) - return DebeziumInput(emptyMap(), deserializedStateValue, false) + return ValidDebeziumWarmStartState(offset, schemaHistory) } } } From 14c6c9db8c81944485c12d3f0b46512b2e8ba7d4 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Thu, 30 Jan 2025 14:02:38 -0500 Subject: [PATCH 4/7] bulk-cdk-extract*: remove StateQuerier (#52051) --- .../io/airbyte/cdk/read/FeedBootstrap.kt | 43 ++++++++---- .../kotlin/io/airbyte/cdk/read/Partitions.kt | 8 +-- .../io/airbyte/cdk/read/StateManager.kt | 34 ++++------ .../io/airbyte/cdk/read/FeedBootstrapTest.kt | 68 +++++++++++-------- .../cdk/read/cdc/CdcPartitionsCreator.kt | 4 +- .../cdk/read/cdc/CdcPartitionsCreatorTest.kt | 14 ++-- .../io/airbyte/cdk/read/TestFixtures.kt | 17 +---- 7 files changed, 94 insertions(+), 94 deletions(-) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt index a564c33306bff..5994b6a854e53 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt @@ -21,7 +21,7 @@ import java.time.ZoneOffset /** * [FeedBootstrap] is the input to a [PartitionsCreatorFactory]. * - * This object conveniently packages the [StateQuerier] singleton with the [feed] for which the + * This object conveniently packages the [StateManager] singleton with the [feed] for which the * [PartitionsCreatorFactory] is to operate on, eventually causing the emission of Airbyte RECORD * messages for the [Stream]s in the [feed]. For this purpose, [FeedBootstrap] provides * [StreamRecordConsumer] instances which essentially provide a layer of caching over @@ -34,15 +34,30 @@ sealed class FeedBootstrap( * The [MetaFieldDecorator] instance which [StreamRecordConsumer] will use to decorate records. */ val metaFieldDecorator: MetaFieldDecorator, - /** [StateQuerier] singleton for use by [PartitionsCreatorFactory]. */ - val stateQuerier: StateQuerier, + /** [StateManager] singleton which is encapsulated by this [FeedBootstrap]. */ + private val stateManager: StateManager, /** [Feed] to emit records for. */ val feed: T ) { - /** Convenience getter for the current state value for the [feed]. */ + /** Delegates to [StateManager.feeds]. */ + val feeds: List + get() = stateManager.feeds + + /** Deletages to [StateManager] to return the current state value for any [Feed]. */ + fun currentState(feed: Feed): OpaqueStateValue? = stateManager.scoped(feed).current() + + /** Convenience getter for the current state value for this [feed]. */ val currentState: OpaqueStateValue? - get() = stateQuerier.current(feed) + get() = currentState(feed) + + /** Resets the state value of this feed and the streams in it to zero. */ + fun resetAll() { + stateManager.scoped(feed).reset() + for (stream in feed.streams) { + stateManager.scoped(stream).reset() + } + } /** A map of all [StreamRecordConsumer] for this [feed]. */ fun streamRecordConsumers(): Map = @@ -98,7 +113,7 @@ sealed class FeedBootstrap( } private val precedingGlobalFeed: Global? = - stateQuerier.feeds + stateManager.feeds .filterIsInstance() .filter { it.streams.contains(stream) } .firstOrNull() @@ -109,7 +124,7 @@ sealed class FeedBootstrap( if (feed is Stream && precedingGlobalFeed != null) { metaFieldDecorator.decorateRecordData( timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC), - globalStateValue = stateQuerier.current(precedingGlobalFeed), + globalStateValue = stateManager.scoped(precedingGlobalFeed).current(), stream, recordData, ) @@ -192,14 +207,14 @@ sealed class FeedBootstrap( fun create( outputConsumer: OutputConsumer, metaFieldDecorator: MetaFieldDecorator, - stateQuerier: StateQuerier, + stateManager: StateManager, feed: Feed, ): FeedBootstrap<*> = when (feed) { is Global -> - GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed) + GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed) is Stream -> - StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed) + StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed) } } } @@ -241,17 +256,17 @@ enum class FieldValueChange { class GlobalFeedBootstrap( outputConsumer: OutputConsumer, metaFieldDecorator: MetaFieldDecorator, - stateQuerier: StateQuerier, + stateManager: StateManager, global: Global, -) : FeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, global) +) : FeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, global) /** [FeedBootstrap] implementation for [Stream] feeds. */ class StreamFeedBootstrap( outputConsumer: OutputConsumer, metaFieldDecorator: MetaFieldDecorator, - stateQuerier: StateQuerier, + stateManager: StateManager, stream: Stream, -) : FeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, stream) { +) : FeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, stream) { /** A [StreamRecordConsumer] instance for this [Stream]. */ fun streamRecordConsumer(): StreamRecordConsumer = streamRecordConsumers()[feed.id]!! diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Partitions.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Partitions.kt index 10cc5c09ed109..98b2fba931f4e 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Partitions.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Partitions.kt @@ -12,10 +12,10 @@ import io.airbyte.cdk.read.PartitionsCreator.TryAcquireResourcesStatus interface PartitionsCreatorFactory { /** * Returns a [PartitionsCreator] which will cause the READ to advance for the [Feed] for which - * the [FeedBootstrap] argument is associated to. The latter exposes a [StateQuerier] to obtain - * the current [OpaqueStateValue] for this [feed] but may also be used to peek at the state of - * other [Feed]s. This may be useful for synchronizing the READ for this [feed] by waiting for - * other [Feed]s to reach a desired state before proceeding; the waiting may be triggered by + * the [FeedBootstrap] argument is associated to. The latter exposes methods to obtain the + * current [OpaqueStateValue] for this [feed] but also to peek at the state of other [Feed]s. + * This may be useful for synchronizing the READ for this [feed] by waiting for other [Feed]s to + * reach a desired state before proceeding; the waiting may be triggered by * [PartitionsCreator.tryAcquireResources] or [PartitionReader.tryAcquireResources]. * * Returns null when the factory is unable to generate a [PartitionsCreator]. This causes diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt index f054c0cec8ff6..08fa27ef45bf4 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt @@ -10,24 +10,12 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStateStats import io.airbyte.protocol.models.v0.AirbyteStreamState -/** A [StateQuerier] is like a read-only [StateManager]. */ -interface StateQuerier { - /** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */ - val feeds: List - - /** Returns the current state value for the given [feed]. */ - fun current(feed: Feed): OpaqueStateValue? - - /** Rolls back each feed state. This is required when resyncing CDC from scratch */ - fun resetFeedStates() -} - /** Singleton object which tracks the state of an ongoing READ operation. */ class StateManager( global: Global? = null, initialGlobalState: OpaqueStateValue? = null, initialStreamStates: Map = mapOf(), -) : StateQuerier { +) { private val global: GlobalStateManager? private val nonGlobal: Map @@ -52,16 +40,14 @@ class StateManager( } } - override val feeds: List = + /** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */ + val feeds: List = listOfNotNull(this.global?.feed) + (this.global?.streamStateManagers?.values?.map { it.feed } ?: listOf()) + nonGlobal.values.map { it.feed } - override fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current() - - override fun resetFeedStates() { - feeds.forEach { f -> scoped(f).set(Jsons.objectNode(), 0) } - } + /** Returns the current state value for the given [feed]. */ + fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current() /** Returns a [StateManagerScopedToFeed] instance scoped to this [feed]. */ fun scoped(feed: Feed): StateManagerScopedToFeed = @@ -86,6 +72,9 @@ class StateManager( state: OpaqueStateValue, numRecords: Long, ) + + /** Resets the current state value in the [StateManager] for this [feed] to zero. */ + fun reset() } /** @@ -119,6 +108,13 @@ class StateManager( pendingNumRecords += numRecords } + @Synchronized + override fun reset() { + currentStateValue = null + pendingStateValue = null + pendingNumRecords = 0L + } + /** * Called by [StateManager.checkpoint] to generate the Airbyte STATE messages for the * checkpoint. diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt index 51136f67dfee2..8f9acb5d8bfac 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt @@ -48,26 +48,13 @@ class FeedBootstrapTest { val global = Global(listOf(stream)) - fun stateQuerier( + fun stateManager( globalStateValue: OpaqueStateValue? = null, streamStateValue: OpaqueStateValue? = null - ): StateQuerier = - object : StateQuerier { - override val feeds: List = listOf(global, stream) + ): StateManager = StateManager(global, globalStateValue, mapOf(stream to streamStateValue)) - override fun current(feed: Feed): OpaqueStateValue? = - when (feed) { - is Global -> globalStateValue - is Stream -> streamStateValue - } - - override fun resetFeedStates() { - // no-op - } - } - - fun Feed.bootstrap(stateQuerier: StateQuerier): FeedBootstrap<*> = - FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateQuerier, this) + fun Feed.bootstrap(stateManager: StateManager): FeedBootstrap<*> = + FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateManager, this) fun expected(vararg data: String): List { val ts = outputConsumer.recordEmittedAt.toEpochMilli() @@ -76,7 +63,7 @@ class FeedBootstrapTest { @Test fun testGlobalColdStart() { - val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateQuerier()) + val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateManager()) Assertions.assertNull(globalBootstrap.currentState) Assertions.assertEquals(1, globalBootstrap.streamRecordConsumers().size) val (actualStreamID, consumer) = globalBootstrap.streamRecordConsumers().toList().first() @@ -91,7 +78,7 @@ class FeedBootstrapTest { @Test fun testGlobalWarmStart() { val globalBootstrap: FeedBootstrap<*> = - global.bootstrap(stateQuerier(globalStateValue = Jsons.objectNode())) + global.bootstrap(stateManager(globalStateValue = Jsons.objectNode())) Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState) Assertions.assertEquals(1, globalBootstrap.streamRecordConsumers().size) val (actualStreamID, consumer) = globalBootstrap.streamRecordConsumers().toList().first() @@ -103,10 +90,36 @@ class FeedBootstrapTest { ) } + @Test + fun testGlobalReset() { + val stateManager: StateManager = + stateManager( + streamStateValue = Jsons.objectNode(), + globalStateValue = Jsons.objectNode() + ) + val globalBootstrap: FeedBootstrap<*> = global.bootstrap(stateManager) + Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState) + Assertions.assertEquals(Jsons.objectNode(), globalBootstrap.currentState(stream)) + // Reset. + globalBootstrap.resetAll() + Assertions.assertNull(globalBootstrap.currentState) + Assertions.assertNull(globalBootstrap.currentState(stream)) + // Set new global state and checkpoint + stateManager.scoped(global).set(Jsons.arrayNode(), 0L) + stateManager.checkpoint().forEach { outputConsumer.accept(it) } + // Check that everything is as it should be. + Assertions.assertEquals(Jsons.arrayNode(), globalBootstrap.currentState) + Assertions.assertNull(globalBootstrap.currentState(stream)) + Assertions.assertEquals( + listOf(RESET_STATE), + outputConsumer.states().map(Jsons::writeValueAsString) + ) + } + @Test fun testStreamColdStart() { val streamBootstrap: FeedBootstrap<*> = - stream.bootstrap(stateQuerier(globalStateValue = Jsons.objectNode())) + stream.bootstrap(stateManager(globalStateValue = Jsons.objectNode())) Assertions.assertNull(streamBootstrap.currentState) Assertions.assertEquals(1, streamBootstrap.streamRecordConsumers().size) val (actualStreamID, consumer) = streamBootstrap.streamRecordConsumers().toList().first() @@ -122,7 +135,7 @@ class FeedBootstrapTest { fun testStreamWarmStart() { val streamBootstrap: FeedBootstrap<*> = stream.bootstrap( - stateQuerier( + stateManager( globalStateValue = Jsons.objectNode(), streamStateValue = Jsons.arrayNode(), ) @@ -140,15 +153,8 @@ class FeedBootstrapTest { @Test fun testChanges() { - val stateQuerier = - object : StateQuerier { - override val feeds: List = listOf(stream) - override fun current(feed: Feed): OpaqueStateValue? = null - override fun resetFeedStates() { - // no-op - } - } - val streamBootstrap = stream.bootstrap(stateQuerier) as StreamFeedBootstrap + val stateManager = StateManager(initialStreamStates = mapOf(stream to null)) + val streamBootstrap = stream.bootstrap(stateManager) as StreamFeedBootstrap val consumer: StreamRecordConsumer = streamBootstrap.streamRecordConsumer() val changes = mapOf( @@ -184,5 +190,7 @@ class FeedBootstrapTest { const val STREAM_RECORD_INPUT_DATA = """{"k":2,"v":"bar"}""" const val STREAM_RECORD_OUTPUT_DATA = """{"k":2,"v":"bar","_ab_cdc_lsn":{},"_ab_cdc_updated_at":"2069-04-20T00:00:00.000000Z","_ab_cdc_deleted_at":null}""" + const val RESET_STATE = + """{"type":"GLOBAL","global":{"shared_state":[],"stream_states":[{"stream_descriptor":{"name":"tbl","namespace":"ns"},"stream_state":{}}]},"sourceStats":{"recordCount":0.0}}""" } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt index b83785e8321ef..a5e2ff0e40c65 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt @@ -46,7 +46,7 @@ class CdcPartitionsCreator>( ) } val activeStreams: List by lazy { - feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null } + feedBootstrap.feed.streams.filter { feedBootstrap.currentState(it) != null } } val syntheticOffset: DebeziumOffset by lazy { creatorOps.generateColdStartOffset() } // Ensure that the WAL position upper bound has been computed for this sync. @@ -96,7 +96,7 @@ class CdcPartitionsCreator>( // TransientErrorException. The next sync will then snapshot the tables. resetReason.set(warmStartState.reason) log.info { "Resetting invalid incumbent CDC state with synthetic state." } - feedBootstrap.stateQuerier.resetFeedStates() + feedBootstrap.resetAll() debeziumProperties = creatorOps.generateColdStartProperties() startingOffset = syntheticOffset startingSchemaHistory = null diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt index 1e62bb66aca76..ce41e48f47951 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt @@ -14,7 +14,6 @@ import io.airbyte.cdk.read.ConfiguredSyncMode import io.airbyte.cdk.read.Global import io.airbyte.cdk.read.GlobalFeedBootstrap import io.airbyte.cdk.read.PartitionReader -import io.airbyte.cdk.read.StateQuerier import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.StreamDescriptor @@ -41,8 +40,6 @@ class CdcPartitionsCreatorTest { @MockK lateinit var readerOps: CdcPartitionReaderDebeziumOperations - @MockK lateinit var stateQuerier: StateQuerier - @MockK lateinit var globalFeedBootstrap: GlobalFeedBootstrap val stream = @@ -78,9 +75,8 @@ class CdcPartitionsCreatorTest { @BeforeEach fun setup() { every { globalFeedBootstrap.feed } returns global - every { globalFeedBootstrap.stateQuerier } returns stateQuerier + every { globalFeedBootstrap.feeds } returns listOf(global, stream) every { globalFeedBootstrap.streamRecordConsumers() } returns emptyMap() - every { stateQuerier.feeds } returns listOf(global, stream) every { creatorOps.position(syntheticOffset) } returns 123L every { creatorOps.position(incumbentOffset) } returns 123L every { creatorOps.generateColdStartOffset() } returns syntheticOffset @@ -91,7 +87,7 @@ class CdcPartitionsCreatorTest { @Test fun testCreateWithSyntheticOffset() { every { globalFeedBootstrap.currentState } returns null - every { stateQuerier.current(stream) } returns null + every { globalFeedBootstrap.currentState(stream) } returns null val syntheticOffset = DebeziumOffset(mapOf(Jsons.nullNode() to Jsons.nullNode())) every { creatorOps.position(syntheticOffset) } returns 123L every { creatorOps.generateColdStartOffset() } returns syntheticOffset @@ -106,7 +102,7 @@ class CdcPartitionsCreatorTest { @Test fun testCreateWithDeserializedOffset() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() - every { stateQuerier.current(stream) } returns Jsons.objectNode() + every { globalFeedBootstrap.currentState(stream) } returns Jsons.objectNode() val deserializedState = ValidDebeziumWarmStartState(offset = incumbentOffset, schemaHistory = null) every { creatorOps.deserializeState(Jsons.objectNode()) } returns deserializedState @@ -121,7 +117,7 @@ class CdcPartitionsCreatorTest { @Test fun testCreateNothing() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() - every { stateQuerier.current(stream) } returns Jsons.objectNode() + every { globalFeedBootstrap.currentState(stream) } returns Jsons.objectNode() val deserializedState = ValidDebeziumWarmStartState(offset = incumbentOffset, schemaHistory = null) every { creatorOps.deserializeState(Jsons.objectNode()) } returns deserializedState @@ -133,7 +129,7 @@ class CdcPartitionsCreatorTest { @Test fun testCreateWithFailedValidation() { every { globalFeedBootstrap.currentState } returns Jsons.objectNode() - every { stateQuerier.current(stream) } returns Jsons.objectNode() + every { globalFeedBootstrap.currentState(stream) } returns Jsons.objectNode() every { creatorOps.deserializeState(Jsons.objectNode()) } returns AbortDebeziumWarmStartState("boom") assertThrows(ConfigErrorException::class.java) { runBlocking { creator.run() } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt index 6db0359837509..7472e7ae32e44 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt @@ -190,14 +190,6 @@ object TestFixtures { SelectQuery(ast.toString(), listOf(), listOf()) } - object MockStateQuerier : StateQuerier { - override val feeds: List = listOf() - override fun current(feed: Feed): OpaqueStateValue? = null - override fun resetFeedStates() { - // no-op - } - } - object MockMetaFieldDecorator : MetaFieldDecorator { override val globalCursor: MetaField? = null override val globalMetaFields: Set = emptySet() @@ -214,14 +206,7 @@ object TestFixtures { StreamFeedBootstrap( outputConsumer = BufferingOutputConsumer(ClockFactory().fixed()), metaFieldDecorator = MockMetaFieldDecorator, - stateQuerier = - object : StateQuerier { - override val feeds: List = listOf(this@bootstrap) - override fun current(feed: Feed): OpaqueStateValue? = opaqueStateValue - override fun resetFeedStates() { - // no-op - } - }, + stateManager = StateManager(initialStreamStates = mapOf(this to opaqueStateValue)), stream = this ) } From 56383d30cdc2d6f736cbc958e4f8fb1e97f06827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20D=C4=85bek?= <373530+szemek@users.noreply.github.com> Date: Thu, 30 Jan 2025 20:08:26 +0100 Subject: [PATCH 5/7] Fix reference to test_report.html.j2 template (#52660) --- .../connectors/test/steps/templates/test_report.html.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/templates/test_report.html.j2 b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/templates/test_report.html.j2 index 409d5c4897359..ea732afba2eb1 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/templates/test_report.html.j2 +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/templates/test_report.html.j2 @@ -196,6 +196,6 @@ function copyToClipBoard(htmlElement) { {% endfor %} -

These reports are generated from this code, please reach out to the Connector Operations team for support.

+

These reports are generated from this code, please reach out to the Connector Operations team for support.

From e2edfd7419d9d2bc50f23d1e0b7ac85218bd87a4 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 30 Jan 2025 11:11:02 -0800 Subject: [PATCH 6/7] Destination-S3: Missing nested IT cases (#52584) --- .../destination/s3_v2/S3V2WriteTest.kt | 253 +++++++++++++++++- 1 file changed, 242 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index ad9e00a6684cb..86b0c029d5274 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -4,11 +4,17 @@ package io.airbyte.integrations.destination.s3_v2 +import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.aws.asMicronautProperties +import io.airbyte.cdk.load.data.* import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper +import io.airbyte.cdk.load.message.InputRecord import io.airbyte.cdk.load.test.util.ExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner +import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper +import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException import io.airbyte.cdk.load.write.AllTypesBehavior import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior @@ -16,9 +22,11 @@ import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.cdk.load.write.UnionBehavior import io.airbyte.cdk.load.write.Untyped import java.util.concurrent.TimeUnit +import org.junit.jupiter.api.Assumptions import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.assertThrows @Timeout(60, unit = TimeUnit.MINUTES) abstract class S3V2WriteTest( @@ -34,6 +42,8 @@ abstract class S3V2WriteTest( allTypesBehavior: AllTypesBehavior, nullEqualsUnset: Boolean = false, nullUnknownTypes: Boolean = false, + envVars: Map = emptyMap(), + private val mergesUnions: Boolean = false ) : BasicFunctionalityIntegrationTest( S3V2TestUtils.getConfig(path), @@ -67,6 +77,233 @@ abstract class S3V2WriteTest( override fun testBasicWriteFile() { super.testBasicWriteFile() } + + @Test + fun testMergeUnions() { + Assumptions.assumeTrue(mergesUnions) + // Avro and parquet merges unions, merging schemas. Validate the behavior by ensuring + // that fields not matching the schema are dropped. + val streamName = "stream" + val stream = + DestinationStream( + descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName), + importType = Append, + generationId = 1L, + minimumGenerationId = 0L, + syncId = 101L, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "union_of_objects" to + FieldType( + type = + UnionType.of( + ObjectType( + linkedMapOf( + "field1" to FieldType(StringType, true), + "field2" to FieldType(IntegerType, true), + "field4" to FieldType(StringType, true) + ) + ), + ObjectType( + linkedMapOf( + "field1" to + FieldType( + StringType, + true + ), // merges to String + "field3" to FieldType(NumberType, true), + "field4" to + FieldType( + BooleanType, + true + ) // merges to String|Boolean + ) + ) + ), + nullable = true + ) + ) + ) + ) + runSync( + configContents, + stream, + listOf( + """{"id": 1, "union_of_objects": {"field1": "a", "field2": 1, "field3": 3.14, "field4": "boo", "field5": "extra"}}""", + """{"id": 2, "union_of_objects": {"field1": "b", "field2": 2, "field3": 2.71, "field4": true, "field5": "extra"}}""" + ) + .map { InputRecord(randomizedNamespace, streamName, it, 1L) } + ) + val field4a: Any = + if (unionBehavior == UnionBehavior.PROMOTE_TO_OBJECT) { + mapOf("type" to "string", "string" to "boo") + } else { + "boo" + } + val field4b: Any = + if (unionBehavior == UnionBehavior.PROMOTE_TO_OBJECT) { + mapOf("type" to "boolean", "boolean" to true) + } else { + true + } + dumpAndDiffRecords( + config = parsedConfig, + canonicalExpectedRecords = + listOf( + mapOf( + "id" to 1, + "union_of_objects" to + mutableMapOf( + "field1" to "a", + "field2" to 1, + "field3" to 3.14, + "field4" to field4a + ) + ), + mapOf( + "id" to 2, + "union_of_objects" to + mapOf( + "field1" to "b", + "field2" to 2, + "field3" to 2.71, + "field4" to field4b + ) + ) + ) + .map { + OutputRecord( + extractedAt = 1L, + generationId = 1L, + data = it, + airbyteMeta = OutputRecord.Meta(syncId = 101L) + ) + }, + stream, + primaryKey = listOf(listOf("id")), + cursor = listOf("id") + ) + } + + @Test + fun conflictingTypesInMappedUnions() { + Assumptions.assumeTrue(unionBehavior == UnionBehavior.PROMOTE_TO_OBJECT) + val stream = + DestinationStream( + descriptor = DestinationStream.Descriptor(randomizedNamespace, "stream"), + importType = Append, + generationId = 1L, + minimumGenerationId = 0L, + syncId = 101L, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "union_of_objects" to + FieldType( + type = + UnionType.of( + ObjectType( + linkedMapOf( + "field1" to FieldType(StringType, true), + ) + ), + ObjectTypeWithoutSchema + ), + nullable = true + ) + ) + ) + ) + + assertThrows { + runSync( + configContents, + stream, + listOf( + """{"id": 1, "union_of_objects": {"field1": "a"}}""", + """{"id": 2, "union_of_objects": {"undeclared": "field"}}""" + ) + .map { InputRecord(randomizedNamespace, "stream", it, 1L) } + ) + } + } + + @Test + fun testMappableTypesNestedInUnions() { + // Avro and parquet both merge unions and map complex types to other types. Validate + // that the behavior still works as expected when nested within a union. + Assumptions.assumeTrue(mergesUnions) + val stream = + DestinationStream( + descriptor = DestinationStream.Descriptor(randomizedNamespace, "stream"), + importType = Append, + generationId = 1L, + minimumGenerationId = 0L, + syncId = 101L, + schema = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, nullable = true), + "union_of_objects" to + FieldType( + type = + UnionType.of( + ObjectType( + linkedMapOf( + "field1" to FieldType(StringType, true), + "field2" to + FieldType( + ObjectType( + linkedMapOf( + "nested_schemaless" to + FieldType( + ObjectTypeWithoutSchema, + true + ), + "nested_union" to + FieldType( + UnionType.of( + StringType, + BooleanType + ), + true + ) + ) + ), + true + ) + ) + ), + StringType + ), + nullable = true + ), + ) + ) + ) + + val expectedRecords = + if (unionBehavior == UnionBehavior.PROMOTE_TO_OBJECT) { + listOf( + """{"id": 1, "union_of_objects": {"field1": "a", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": {"type": "string", "string": "string"}}}}""", + """{"id": 2, "union_of_objects": {"field1": "b", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": {"type": "boolean", "boolean": true}}}}""" + ) + } else { + listOf( + """{"id": 1, "union_of_objects": {"field1": "a", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": "string"}}}""", + """{"id": 2, "union_of_objects": {"field1": "b", "field2": {"nested_schemaless": "{\"field\": \"value\"}", "nested_union": true}}}""" + ) + } + + runSync( + configContents, + stream, + expectedRecords.map { InputRecord(randomizedNamespace, "stream", it, 1L) } + ) + } } class S3V2WriteTestJsonUncompressed : @@ -160,17 +397,8 @@ class S3V2WriteTestAvroUncompressed : allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, nullUnknownTypes = true, - ) { - @Test - override fun testUnknownTypes() { - super.testUnknownTypes() - } - - @Test - override fun testFunkyCharacters() { - super.testFunkyCharacters() - } -} + mergesUnions = true + ) class S3V2WriteTestAvroBzip2 : S3V2WriteTest( @@ -184,6 +412,7 @@ class S3V2WriteTestAvroBzip2 : allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, nullUnknownTypes = true, + mergesUnions = true ) class S3V2WriteTestParquetUncompressed : @@ -198,6 +427,7 @@ class S3V2WriteTestParquetUncompressed : allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, nullUnknownTypes = true, + mergesUnions = true ) class S3V2WriteTestParquetSnappy : @@ -212,6 +442,7 @@ class S3V2WriteTestParquetSnappy : allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, nullUnknownTypes = true, + mergesUnions = true ) class S3V2WriteTestEndpointURL : From 2c3e6a1b117f0b77241b7df1a2da9e5edaa1c1a3 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Thu, 30 Jan 2025 11:23:38 -0800 Subject: [PATCH 7/7] chore: add destination performance testing built-in (#52641) Co-authored-by: Octavia Squidington III --- .../cdk/load/test/util/IntegrationTest.kt | 2 +- .../cdk/load/write/BasicPerformanceTest.kt | 346 ++++++++++++++++++ .../load/write/PerformanceTestScenarios.kt | 188 ++++++++++ .../dev_null/DevNullPerformanceTest.kt | 20 + 4 files changed, 555 insertions(+), 1 deletion(-) create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt create mode 100644 airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullPerformanceTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index c8acbb912addf..6b7dfa0e83037 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -310,7 +310,7 @@ abstract class IntegrationTest( * You probably don't want to actually interact with this. This is generally intended to * support a specific legacy behavior. Prefer using micronaut properties when possible. */ - @SystemStub private lateinit var nonDockerMockEnvVars: EnvironmentVariables + @SystemStub internal lateinit var nonDockerMockEnvVars: EnvironmentVariables @JvmStatic @BeforeAll diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt new file mode 100644 index 0000000000000..e398e7e565c92 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.command.ValidatedJsonUtils +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.message.DestinationRecordStreamComplete +import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.cdk.load.test.util.FakeConfigurationUpdater +import io.airbyte.cdk.load.test.util.IntegrationTest +import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess +import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory +import io.airbyte.protocol.models.Jsons +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Instant +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import kotlin.time.measureTime +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.commons.lang3.RandomStringUtils +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInfo +import org.junit.jupiter.api.extension.ExtendWith +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables +import uk.org.webcompere.systemstubs.jupiter.SystemStub +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension + +private val log = KotlinLogging.logger {} + +data class NamedField(val name: String, val type: AirbyteType, val sample: Any) + +/** Defines a performance test scenario. */ +interface PerformanceTestScenario { + data class Summary( + val records: Long, + val size: Long, + val expectedRecordsCount: Long, + ) + + /** The catalog used for the performance test. */ + val catalog: DestinationCatalog + + /** + * The main method from the performance scenario. + * + * This would be where records are emitted to the destination. How, is up to the scenario to + * define. + */ + fun send(destination: DestinationProcess) + + /** + * Returns the expectations from the test scenario: how many records were emitted, how many + * records are expected to be written in the final table (in the case of duplicates, this should + * be the number of distinct records) and the volume of data emitted. + */ + fun getSummary(): Summary +} + +/** Interface to implement for destination that support data validation. */ +interface DataValidator { + /** + * Returns the count of how many records are present for the stream in the final table. null if + * not found. + */ + fun count(spec: ConfigurationSpecification, stream: DestinationStream): Long? +} + +data class PerformanceTestSummary( + val namespace: String?, + val streamName: String, + val recordCount: Long?, + val emittedRecordCount: Long, + val recordPerSeconds: Double, + val megabytePerSeconds: Double, +) + +typealias ValidationFunction = (List) -> Unit + +@Suppress("SameParameterValue") +@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION", justification = "Micronaut DI") +@ExtendWith(SystemStubsExtension::class) +abstract class BasicPerformanceTest( + val defaultRecordsToInsert: Long, + val configContents: String, + val configSpecClass: Class, + val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater, + val dataValidator: DataValidator? = null, +) { + + protected val destinationProcessFactory = DestinationProcessFactory.get(emptyList()) + + private lateinit var testInfo: TestInfo + private lateinit var testPrettyName: String + + val randomizedNamespace = run { + val randomSuffix = RandomStringUtils.secure().nextAlphabetic(4) + val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd") + val timestampString = + LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) + .format(randomizedNamespaceDateFormatter) + "test$timestampString$randomSuffix" + } + + val idColumn = NamedField("id", IntegerType, 1337) + val twoStringColumns = + listOf( + NamedField("column1", StringType, "1".repeat(100)), + NamedField("column2", StringType, "2".repeat(100)), + ) + + @Test + open fun testInsertRecords() { + testInsertRecords(null) + } + + protected fun testInsertRecords(validation: ValidationFunction?) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = twoStringColumns, + recordsToInsert = defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + ), + validation = validation, + ) + } + + @Test + open fun testInsertRecordsComplexTypes() { + testInsertRecordsComplexTypes(null) + } + + protected fun testInsertRecordsComplexTypes(validation: ValidationFunction?) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = + listOf( + NamedField( + "tWithTz", + TimeTypeWithTimezone, + LocalTime.now().atOffset(ZoneOffset.UTC).toString() + ), + NamedField("t", TimeTypeWithoutTimezone, LocalTime.now().toString()), + NamedField( + "tsWithTz", + TimestampTypeWithTimezone, + OffsetDateTime.now().toString() + ), + NamedField( + "ts", + TimestampTypeWithoutTimezone, + OffsetDateTime.now().toLocalDateTime().toString() + ), + NamedField( + "object", + ObjectTypeWithoutSchema, + Jsons.serialize(mapOf("object" to "value")) + ) + ), + recordsToInsert = defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + ), + validation = validation, + ) + } + + @Test + open fun testInsertRecordsWithDedup() { + testInsertRecordsWithDedup(null) + } + + protected fun testInsertRecordsWithDedup(validation: ValidationFunction?) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = twoStringColumns, + dedup = true, + duplicateChance = 0.25, + recordsToInsert = defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + ), + validation = validation, + ) + } + + @Test + open fun testInsertRecordsWithManyColumns() { + testInsertRecordsWithManyColumns(null) + } + + protected fun testInsertRecordsWithManyColumns(validation: ValidationFunction?) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = (1..100).map { NamedField("column$it", StringType, "1".repeat(50)) }, + recordsToInsert = defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + ), + validation = validation, + ) + } + + @Test + open fun testAppendRecordsWithDuplicates() { + testAppendRecordsWithDuplicates(null) + } + + protected fun testAppendRecordsWithDuplicates(validation: ValidationFunction?) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = twoStringColumns, + dedup = false, + duplicateChance = 0.25, + recordsToInsert = defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + ), + validation = validation, + ) + } + + companion object { + // Connectors are calling System.getenv rather than using micronaut-y properties, + // so we have to mock it out, instead of just setting more properties + // inside NonDockerizedDestination. + // This field has no effect on DockerizedDestination, which explicitly + // sets env vars when invoking `docker run`. + @SystemStub lateinit var nonDockerMockEnvVars: EnvironmentVariables + + @JvmStatic + @BeforeAll + fun beforeAll() { + // NonDockerizedDestinations are hardcoded on IntegrationTest, not fixing for now. + IntegrationTest.nonDockerMockEnvVars = nonDockerMockEnvVars + IntegrationTest.nonDockerMockEnvVars.set("WORKER_JOB_ID", "0") + } + } + + @BeforeEach + fun getTestInfo(testInfo: TestInfo) { + this.testInfo = testInfo + testPrettyName = "${testInfo.testClass.get().simpleName}.${testInfo.displayName}" + destinationProcessFactory.testName = testPrettyName + } + + protected fun runSync( + testScenario: PerformanceTestScenario, + useFileTransfer: Boolean = false, + validation: ValidationFunction? = null, + ): List { + val testConfig = configUpdater.update(configContents) + val destination = + destinationProcessFactory.createDestinationProcess( + "write", + testConfig, + testScenario.catalog.asProtocolObject(), + useFileTransfer = useFileTransfer, + ) + + val duration = + runBlocking(Dispatchers.IO) { + launch { destination.run() } + + measureTime { + testScenario.send(destination) + testScenario.catalog.streams.forEach { + destination.sendMessage( + DestinationRecordStreamComplete( + it.descriptor, + System.currentTimeMillis() + ) + .asProtocolMessage() + ) + } + destination.shutdown() + } + } + + val summary = testScenario.getSummary() + val recordPerSeconds = summary.records.toDouble() / duration.inWholeMilliseconds * 1000 + val megabytePerSeconds = + summary.size.toDouble() / 1000000 / duration.inWholeMilliseconds * 1000 + log.info { "$testPrettyName: loaded ${summary.records} records in $duration" } + log.info { "$testPrettyName: loaded ${"%.2f".format(recordPerSeconds)} rps" } + log.info { "$testPrettyName: loaded ${"%.2f".format(megabytePerSeconds)} MBps" } + + val recordCount = + dataValidator?.let { validator -> + val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, testConfig) + val recordCount = validator.count(parsedConfig, testScenario.catalog.streams[0]) + + recordCount?.also { + log.info { + "$testPrettyName: table contains ${it} records" + + " (expected ${summary.expectedRecordsCount} records, " + + "emitted ${summary.records} records)" + } + } + } + + val performanceTestSummary = + listOf( + PerformanceTestSummary( + namespace = testScenario.catalog.streams[0].descriptor.namespace, + streamName = testScenario.catalog.streams[0].descriptor.name, + recordCount = recordCount, + emittedRecordCount = summary.records, + recordPerSeconds = recordPerSeconds, + megabytePerSeconds = megabytePerSeconds, + ) + ) + validation?.let { it(performanceTestSummary) } + return performanceTestSummary + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt new file mode 100644 index 0000000000000..3306213b1204b --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.message.InputRecord +import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess +import io.airbyte.protocol.models.Jsons +import java.security.SecureRandom + +/** + * Single stream performance test. + * + * This performance scenario will insert [recordsToInsert] records are generated from the [idColumn] + * and [columns] parameters. Records are the same except for the id which will be automatically + * incremented. [dedup] controls whether the insert mode is `Append` or `Dedupe`. [duplicateChance] + * if non 0 will insert random duplicates of records. + */ +class SingleStreamInsert( + private val idColumn: NamedField, + private val columns: List, + private val recordsToInsert: Long, + private val dedup: Boolean = false, + duplicateChance: Double = 0.0, + randomizedNamespace: String, + streamName: String, +) : PerformanceTestScenario { + + init { + assert(duplicateChance in 0.0..1.0) + } + + private val stream = run { + val importType = + if (!dedup) Append + else + Dedupe( + primaryKey = listOf(listOf(idColumn.name)), + cursor = listOf(idColumn.name), + ) + val schema = + (listOf(idColumn) + columns).map { + Pair(it.name, FieldType(type = it.type, nullable = true)) + } + + DestinationStream( + descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName), + importType = importType, + schema = ObjectType(linkedMapOf(*schema.toTypedArray())), + generationId = 0, + minimumGenerationId = 0, + syncId = 1, + ) + } + + private val random = SecureRandom() + private val randomThreshold: Int = + if (duplicateChance > 0.0) ((duplicateChance % 1.0) * 100).toInt() else 0 + + private var recordCount: Long = 0 + private var byteCount: Long = 0 + + override val catalog = DestinationCatalog(listOf(stream)) + + class RecordWriter( + indexColumn: NamedField, + columns: List, + stream: DestinationStream, + private val destination: DestinationProcess, + private val recordBufferSize: Long = 1, + ) : AutoCloseable { + private val baseRecord = run { + val data = (listOf(indexColumn) + columns).associate { Pair(it.name, it.sample) } + InputRecord( + namespace = stream.descriptor.namespace, + name = stream.descriptor.name, + data = Jsons.serialize(data), + emittedAtMs = System.currentTimeMillis(), + ) + } + private val messageParts = + Jsons.serialize(baseRecord.asProtocolMessage()).split(indexColumn.sample.toString()) + private val baseMessageSize = messageParts.sumOf { it.length } + + private val sb = StringBuilder() + + var recordWritten: Long = 0 + var bytesWritten: Long = 0 + + fun write(id: Long) { + sb.append(messageParts[0]) + sb.append(id) + sb.append(messageParts[1]) + sb.appendLine() + + if (recordWritten % recordBufferSize == 0L) { + flush() + } + + recordWritten += 1 + bytesWritten += baseMessageSize + id.length() + } + + private fun flush() { + if (sb.isNotEmpty()) { + destination.sendMessage(sb.toString()) + sb.clear() + } + } + + override fun close() { + flush() + } + } + + override fun send(destination: DestinationProcess) { + RecordWriter( + indexColumn = idColumn, + columns = columns, + stream = stream, + destination = destination, + recordBufferSize = 10, + ) + .use { writer -> + (1..recordsToInsert).forEach { + writer.write(it) + if (randomThreshold > 0 && random.nextInt(0, 100) <= randomThreshold) { + writer.write(it) + } + } + recordCount = writer.recordWritten + byteCount = writer.bytesWritten + } + } + + override fun getSummary() = + PerformanceTestScenario.Summary( + recordCount, + byteCount, + expectedRecordsCount = if (dedup) recordsToInsert else recordCount, + ) +} + +private fun Long.length(): Long = + if (this <= 99999) { + if (this <= 99) { + if (this <= 9) { + 1 + } else { + 2 + } + } else { + if (this <= 999) { + 3 + } else { + if (this <= 9999) { + 4 + } else { + 5 + } + } + } + } else { + if (this <= 9999999) { + if (this <= 999999) { + 6 + } else { + 7 + } + } else { + if (this <= 99999999) { + 8 + } else { + if (this <= 999999999) { + 9 + } else { + 10 + } + } + } + } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullPerformanceTest.kt b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullPerformanceTest.kt new file mode 100644 index 0000000000000..ad9481f02001b --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullPerformanceTest.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.dev_null + +import io.airbyte.cdk.load.write.BasicPerformanceTest +import org.junit.jupiter.api.Test + +class DevNullPerformanceTest : + BasicPerformanceTest( + configContents = DevNullTestUtils.loggingConfigContents, + configSpecClass = DevNullSpecification::class.java, + defaultRecordsToInsert = 1000000, + ) { + @Test + override fun testInsertRecords() { + testInsertRecords { summary -> assert(summary[0].recordPerSeconds > 1000) } + } +}