Skip to content

Commit

Permalink
source-mysql: adopt bulk-cdk-toolkit-extract-cdc API changes (#52039)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta authored Jan 30, 2025
1 parent a722593 commit 8c2ed63
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 120 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.277'
cdk = '0.300'
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.11.0
dockerImageTag: 3.11.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.LongFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.cdc.CdcPartitionsCreator.OffsetInvalidNeedsResyncIllegalStateException
import io.airbyte.cdk.read.cdc.DebeziumInput
import io.airbyte.cdk.read.cdc.AbortDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.read.cdc.DebeziumOperations
import io.airbyte.cdk.read.cdc.DebeziumPropertiesBuilder
import io.airbyte.cdk.read.cdc.DebeziumRecordKey
import io.airbyte.cdk.read.cdc.DebeziumRecordValue
import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumWarmStartState
import io.airbyte.cdk.read.cdc.DeserializedRecord
import io.airbyte.cdk.read.cdc.InvalidDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ResetDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ValidDebeziumWarmStartState
import io.airbyte.cdk.ssh.TunnelSession
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mysql.MySqlConnector
Expand Down Expand Up @@ -63,8 +65,11 @@ class MySqlSourceDebeziumOperations(
random: Random = Random.Default,
) : DebeziumOperations<MySqlSourceCdcPosition> {
private val log = KotlinLogging.logger {}
private val cdcIncrementalConfiguration: CdcIncrementalConfiguration by lazy {
configuration.incrementalConfiguration as CdcIncrementalConfiguration
}

override fun deserialize(
override fun deserializeRecord(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
Expand Down Expand Up @@ -130,29 +135,42 @@ class MySqlSourceDebeziumOperations(
override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
value.source["table"]?.asText()

override fun deserializeState(
opaqueStateValue: OpaqueStateValue,
): DebeziumWarmStartState {
val debeziumState: UnvalidatedDeserializedState =
try {
deserializeStateUnvalidated(opaqueStateValue)
} catch (e: Exception) {
log.error(e) { "Error deserializing incumbent state value." }
return AbortDebeziumWarmStartState(
"Error deserializing incumbent state value: ${e.message}"
)
}
return validate(debeziumState)
}

/**
* Checks if GTIDs from previously saved state (debeziumInput) are still valid on DB. And also
* check if binlog exists or not.
*
* Validate is not supposed to perform on synthetic state.
*/
private fun validate(debeziumState: DebeziumState): CdcStateValidateResult {
private fun validate(debeziumState: UnvalidatedDeserializedState): DebeziumWarmStartState {
val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState)
val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids()
if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) {
log.info {
return abortCdcSync(
"Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled"
}
return abortCdcSync()
)
}

val savedGtidSet = MySqlGtidSet(savedStateOffset.gtidSet)
val availableGtidSet = MySqlGtidSet(gtidSet)
if (!savedGtidSet.isContainedWithin(availableGtidSet)) {
log.info {
return abortCdcSync(
"Connector last known GTIDs are $savedGtidSet, but MySQL server only has $availableGtidSet"
}
return abortCdcSync()
)
}

// newGtidSet is gtids from server that hasn't been seen by this connector yet. If the set
Expand All @@ -161,49 +179,42 @@ class MySqlSourceDebeziumOperations(
if (!newGtidSet.isEmpty) {
val purgedGtidSet = queryPurgedIds()
if (!purgedGtidSet.isEmpty && !newGtidSet.subtract(purgedGtidSet).equals(newGtidSet)) {
log.info {
return abortCdcSync(
"Connector has not seen GTIDs $newGtidSet, but MySQL server has purged $purgedGtidSet"
}
return abortCdcSync()
)
}
}
if (!savedGtidSet.isEmpty) {
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
return CdcStateValidateResult.VALID
}
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
log.info {
"Connector last known binlog file ${savedStateOffset.position.fileName} is " +
"not found in the server. Server has $existingLogFiles"
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
if (savedGtidSet.isEmpty) {
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
return abortCdcSync(
"Connector last known binlog file ${savedStateOffset.position.fileName} is not found in the server. Server has $existingLogFiles"
)
}
return abortCdcSync()
}
return CdcStateValidateResult.VALID
return ValidDebeziumWarmStartState(debeziumState.offset, debeziumState.schemaHistory)
}

private fun abortCdcSync(): CdcStateValidateResult {
val cdcIncrementalConfiguration: CdcIncrementalConfiguration =
configuration.incrementalConfiguration as CdcIncrementalConfiguration
return when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC -> {
log.warn { "Saved offset no longer present on the server. aborting sync." }
CdcStateValidateResult.INVALID_ABORT
}
InvalidCdcCursorPositionBehavior.RESET_SYNC -> {
log.warn {
"Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch."
}
CdcStateValidateResult.INVALID_RESET
}
private fun abortCdcSync(reason: String): InvalidDebeziumWarmStartState =
when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC ->
AbortDebeziumWarmStartState(
"Saved offset no longer present on the server, please reset the connection, " +
"and then increase binlog retention and/or increase sync frequency. " +
"$reason."
)
InvalidCdcCursorPositionBehavior.RESET_SYNC ->
ResetDebeziumWarmStartState(
"Saved offset no longer present on the server. $reason."
)
}
}

private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset {
private fun parseSavedOffset(debeziumState: UnvalidatedDeserializedState): SavedOffset {
val position: MySqlSourceCdcPosition = position(debeziumState.offset)
val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText()
return SavedOffset(position, gtidSet)
Expand Down Expand Up @@ -233,7 +244,7 @@ class MySqlSourceDebeziumOperations(
return MySqlSourceCdcPosition(file.toString(), pos)
}

override fun synthesize(): DebeziumInput {
override fun generateColdStartOffset(): DebeziumOffset {
val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) =
queryPositionAndGtids()
val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName)
Expand All @@ -254,8 +265,7 @@ class MySqlSourceDebeziumOperations(
}
val offset = DebeziumOffset(mapOf(key to value))
log.info { "Constructed synthetic $offset." }
val state = DebeziumState(offset, schemaHistory = null)
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
return offset
}

private fun queryPositionAndGtids(): Pair<MySqlSourceCdcPosition, String?> {
Expand Down Expand Up @@ -319,49 +329,37 @@ class MySqlSourceDebeziumOperations(
}
}

override fun deserialize(
opaqueStateValue: OpaqueStateValue,
streams: List<Stream>
): DebeziumInput {
val debeziumState: DebeziumState =
try {
deserializeDebeziumState(opaqueStateValue)
} catch (e: Exception) {
throw ConfigErrorException("Error deserializing $opaqueStateValue", e)
}
val cdcValidationResult = validate(debeziumState)
if (cdcValidationResult != CdcStateValidateResult.VALID) {
if (cdcValidationResult == CdcStateValidateResult.INVALID_ABORT) {
throw ConfigErrorException(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency."
)
}
if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) {
throw OffsetInvalidNeedsResyncIllegalStateException()
}
return synthesize()
}
override fun generateColdStartProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()

val properties: Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()
return DebeziumInput(properties, debeziumState, isSynthetic = false)
}
override fun generateWarmStartProperties(streams: List<Stream>): Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()

override fun serialize(debeziumState: DebeziumState): OpaqueStateValue {
override fun serializeState(
offset: DebeziumOffset,
schemaHistory: DebeziumSchemaHistory?
): OpaqueStateValue {
val stateNode: ObjectNode = Jsons.objectNode()
// Serialize offset.
val offsetNode: JsonNode =
Jsons.objectNode().apply {
for ((k, v) in debeziumState.offset.wrapped) {
for ((k, v) in offset.wrapped) {
put(Jsons.writeValueAsString(k), Jsons.writeValueAsString(v))
}
}
stateNode.set<JsonNode>(MYSQL_CDC_OFFSET, offsetNode)
// Serialize schema history.
val schemaHistory: List<HistoryRecord>? = debeziumState.schemaHistory?.wrapped
if (schemaHistory != null) {
val uncompressedString: String =
schemaHistory.joinToString(separator = "\n") {
schemaHistory.wrapped.joinToString(separator = "\n") {
DocumentWriter.defaultWriter().write(it.document())
}
if (uncompressedString.length <= MAX_UNCOMPRESSED_LENGTH) {
Expand Down Expand Up @@ -427,24 +425,11 @@ class MySqlSourceDebeziumOperations(
MySqlSourceCdcTemporalConverter::class
)

val serverTimezone: String? =
(configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone
if (!serverTimezone.isNullOrBlank()) {
dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone)
}
dbzPropertiesBuilder.buildMap()
}
cdcIncrementalConfiguration.serverTimezone
?.takeUnless { it.isBlank() }
?.let { dbzPropertiesBuilder.withDatabase("connectionTimezone", it) }

val syntheticProperties: Map<String, String> by lazy {
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
dbzPropertiesBuilder.buildMap()
}

companion object {
Expand All @@ -470,7 +455,9 @@ class MySqlSourceDebeziumOperations(
val INTERNAL_CONVERTER_CONFIG: Map<String, String> =
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())

internal fun deserializeDebeziumState(opaqueStateValue: OpaqueStateValue): DebeziumState {
internal fun deserializeStateUnvalidated(
opaqueStateValue: OpaqueStateValue
): UnvalidatedDeserializedState {
val stateNode: ObjectNode = opaqueStateValue[STATE] as ObjectNode
// Deserialize offset.
val offsetNode: ObjectNode = stateNode[MYSQL_CDC_OFFSET] as ObjectNode
Expand All @@ -486,15 +473,14 @@ class MySqlSourceDebeziumOperations(
val offset = DebeziumOffset(offsetMap)
// Deserialize schema history.
val schemaNode: JsonNode =
stateNode[MYSQL_DB_HISTORY] ?: return DebeziumState(offset, schemaHistory = null)
stateNode[MYSQL_DB_HISTORY] ?: return UnvalidatedDeserializedState(offset)
val isCompressed: Boolean = stateNode[IS_COMPRESSED]?.asBoolean() ?: false
val uncompressedString: String =
if (isCompressed) {
val textValue: String = schemaNode.textValue()
val compressedBytes: ByteArray =
textValue.substring(1, textValue.length - 1).toByteArray(Charsets.UTF_8)
val decoded = Base64.decodeBase64(compressedBytes)

GZIPInputStream(ByteArrayInputStream(decoded)).reader(Charsets.UTF_8).readText()
} else {
schemaNode.textValue()
Expand All @@ -504,9 +490,14 @@ class MySqlSourceDebeziumOperations(
.lines()
.filter { it.isNotBlank() }
.map { HistoryRecord(DocumentReader.defaultReader().read(it)) }
return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList))
return UnvalidatedDeserializedState(offset, DebeziumSchemaHistory(schemaHistoryList))
}

data class UnvalidatedDeserializedState(
val offset: DebeziumOffset,
val schemaHistory: DebeziumSchemaHistory? = null,
)

internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition {
if (offset.wrapped.size != 1) {
throw ConfigErrorException("Expected exactly 1 key in $offset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import io.airbyte.cdk.read.Where
import io.airbyte.cdk.read.WhereClauseLeafNode
import io.airbyte.cdk.read.WhereClauseNode
import io.airbyte.cdk.read.WhereNode
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
Expand Down Expand Up @@ -102,10 +102,9 @@ class MySqlSourceOperations :
if (globalStateValue == null) {
return
}
val debeziumState: DebeziumState =
MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue)
val position: MySqlSourceCdcPosition =
MySqlSourceDebeziumOperations.position(debeziumState.offset)
val offset: DebeziumOffset =
MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset
val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
CdcStringMetaFieldType.jsonEncoder.encode(position.fileName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.ConfiguredSyncMode
import io.airbyte.cdk.read.DefaultJdbcSharedState
import io.airbyte.cdk.read.Feed
import io.airbyte.cdk.read.SelectQuerier
import io.airbyte.cdk.read.StateQuerier
import io.airbyte.cdk.read.StateManager
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.StreamFeedBootstrap
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -152,15 +151,8 @@ class MySqlSourceJdbcPartitionFactoryTest {
recordData: ObjectNode
) {}
},
stateQuerier =
object : StateQuerier {
override val feeds: List<Feed> = listOf(stream)
override fun current(feed: Feed): OpaqueStateValue? =
if (feed == stream) incumbentStateValue else null
override fun resetFeedStates() {
/* no-op */
}
},
stateManager =
StateManager(initialStreamStates = mapOf(stream to incumbentStateValue)),
stream,
)
}
Expand Down
Loading

0 comments on commit 8c2ed63

Please sign in to comment.