Skip to content

Commit

Permalink
[source-mssql]: Enhance OC logic to detect composite cluster keys and…
Browse files Browse the repository at this point in the history
… default to PK (#53217)
  • Loading branch information
yardencarmeli authored Feb 21, 2025
1 parent a98e3a2 commit 8bd3607
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 31 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.21
dockerImageTag: 4.1.22
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,9 +82,9 @@ private static String getCatalog(final SqlDatabase database) {
return (database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ? database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText() : null);
}

public static String discoverClusteredIndexForStream(final JdbcDatabase database,
final AirbyteStream stream) {
Map<String, String> clusteredIndexes = new HashMap<>();
public static Map<String, List<String>> discoverClusteredIndexForStream(final JdbcDatabase database,
final AirbyteStream stream) {
Map<String, List<String>> clusteredIndexes = new HashMap<>();
try {
// Get all clustered index names without specifying a table name
clusteredIndexes = aggregateClusteredIndexes(database.bufferedResultSetQuery(
Expand All @@ -104,12 +103,10 @@ public static String discoverClusteredIndexForStream(final JdbcDatabase database
} catch (final SQLException e) {
LOGGER.debug(String.format("Could not retrieve clustered indexes without a table name (%s), not blocking, fall back to use pk.", e));
}
LOGGER.debug("clusteredIndexes: {}", StringUtils.join(clusteredIndexes));
final String streamName = stream.getName();
final String namespace = stream.getNamespace();

return clusteredIndexes.getOrDefault(
getFullyQualifiedTableName(namespace, streamName), null);
LOGGER.debug("Clustered Indexes: {}", clusteredIndexes);

return clusteredIndexes.isEmpty() ? null : clusteredIndexes;
}

@VisibleForTesting
Expand All @@ -123,16 +120,18 @@ public record ClusteredIndexAttributesFromDb(String streamName,
* multiple columns, we always use the first column.
*/
@VisibleForTesting
static Map<String, String> aggregateClusteredIndexes(final List<ClusteredIndexAttributesFromDb> entries) {
final Map<String, String> result = new HashMap<>();
static Map<String, List<String>> aggregateClusteredIndexes(final List<ClusteredIndexAttributesFromDb> entries) {
final Map<String, List<String>> result = new HashMap<>();

entries.forEach(entry -> {
if (entry == null) {
return;
}
if (result.containsKey(entry.streamName())) {
return;
if (!result.containsKey(entry.streamName())) {
result.put(entry.streamName(), new ArrayList<>());
}
result.put(entry.streamName, entry.columnName());
// Store the column name in a list to support composite clustered indexes.
result.get(entry.streamName()).add(entry.columnName());
});
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcResyncMessage;
import static io.airbyte.cdk.db.DbAnalyticsUtils.wassOccurrenceMessage;
import static io.airbyte.cdk.db.jdbc.JdbcUtils.getFullyQualifiedTableName;
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.RESYNC_DATA_OPTION;
import static io.airbyte.integrations.source.mssql.MssqlCdcHelper.getDebeziumProperties;
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mssql.cdc.MssqlCdcStateConstants.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadHandler.discoverClusteredIndexForStream;
import static io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadStateManager.ORDERED_COL_STATE_TYPE;
import static io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadStateManager.STATE_TYPE_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
Expand Down Expand Up @@ -420,18 +421,14 @@ static Optional<OrderedColumnInfo> getOrderedColumnInfo(final JdbcDatabase datab
// if (stream.getStream().getSourceDefinedPrimaryKey().size() > 1) {
// LOGGER.info("Composite primary key detected for {namespace, stream} : {}, {}",
// stream.getStream().getNamespace(), stream.getStream().getName());
// } // TODO: validate the seleted column rather than primary key
final String clusterdIndexField = discoverClusteredIndexForStream(database, stream.getStream());
final String ocFieldName;
if (clusterdIndexField != null) {
ocFieldName = clusterdIndexField;
} else {
if (stream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
return Optional.empty();
}
ocFieldName = stream.getStream().getSourceDefinedPrimaryKey().getFirst().getFirst();
// }
Optional<String> ocFieldNameOpt = selectOcFieldName(database, stream);
if (ocFieldNameOpt.isEmpty()) {
LOGGER.info("No primary key or clustered index found for stream: " + stream.getStream().getName());
return Optional.empty();
}

String ocFieldName = ocFieldNameOpt.get();
LOGGER.info("selected ordered column field name: " + ocFieldName);
final JDBCType ocFieldType = table.getFields().stream()
.filter(field -> field.getName().equals(ocFieldName))
Expand All @@ -440,6 +437,31 @@ static Optional<OrderedColumnInfo> getOrderedColumnInfo(final JdbcDatabase datab
return Optional.of(new OrderedColumnInfo(ocFieldName, ocFieldType, ocMaxValue));
}

@VisibleForTesting
public static Optional<String> selectOcFieldName(final JdbcDatabase database,
final ConfiguredAirbyteStream stream) {

final Map<String, List<String>> clusterdIndexField = MssqlInitialLoadHandler.discoverClusteredIndexForStream(database, stream.getStream());
final String streamName = getFullyQualifiedTableName(stream.getStream().getNamespace(), stream.getStream().getName());
final List<List<String>> primaryKey = stream.getStream().getSourceDefinedPrimaryKey();
final String ocFieldName;

final List<String> clusterColumns = Optional.ofNullable(clusterdIndexField)
.map(map -> map.get(streamName))
.orElse(new ArrayList<>());

// Use the clustered index unless it is composite. Otherwise, default to the primary key.
if (clusterColumns.size() == 1) {
ocFieldName = clusterColumns.getFirst();
} else if (!primaryKey.isEmpty()) {
LOGGER.info("Clustered index is empty or composite. Defaulting to primary key.");
ocFieldName = primaryKey.getFirst().getFirst();
} else {
return Optional.empty();
}
return Optional.of(ocFieldName);
}

public static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MSSqlSourceExceptionHandler : ConnectorExceptionHandler() {
// adding connector specific error profiles
add(
ConnectorErrorProfile(
errorClass = "MS SQL Exception", // which should we use?
errorClass = "MS SQL Exception",
regexMatchingPattern =
".*returned an incomplete response. The connection has been closed.*",
failureType = FailureType.TRANSIENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadHandler;
import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialReadUtil;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.*;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.*;

class MssqlSourceTest {
Expand Down Expand Up @@ -124,10 +127,14 @@ void testDiscoverWithNonClusteredPk() throws SQLException {
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());
final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
final Map<String, List<String>> oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
new AirbyteStream().withName(
actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace()));
assertEquals(oc, "name");

String firstOcKey = oc.entrySet().iterator().next().getKey();
List<String> ocValues = oc.get(firstOcKey);
assertEquals(1, ocValues.size());
assertEquals("name", ocValues.get(0));
}

@Test
Expand All @@ -139,10 +146,114 @@ void testDiscoverWithNoClusteredIndex() throws SQLException {
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());
final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
final Map<String, List<String>> oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
new AirbyteStream().withName(
actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace()));

assertNull(oc);
}

@Test
void testDiscoverWithClusteredCompositeIndex() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);")
.with("CREATE CLUSTERED INDEX n1 ON id_and_name (id, name)");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());

AirbyteStream stream = new AirbyteStream().withName(
actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace())
.withSourceDefinedPrimaryKey(actual.getStreams().get(0).getSourceDefinedPrimaryKey());

Map<String, List<String>> oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db, stream);

String firstOcKey = oc.entrySet().iterator().next().getKey();
List<String> ocValues = oc.get(firstOcKey);
assertEquals(2, ocValues.size());

}

@Test
void testUsingPkWhenClusteredCompositeIndex() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);")
.with("CREATE CLUSTERED INDEX n1 ON id_and_name (id, name)");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());

AirbyteStream stream = new AirbyteStream().withName(
actual.getStreams().getFirst().getName()).withNamespace(actual.getStreams().getFirst().getNamespace())
.withSourceDefinedPrimaryKey(actual.getStreams().getFirst().getSourceDefinedPrimaryKey());

ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withSyncMode(
SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(stream);

final List<List<String>> primaryKey = configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey();
Optional<String> oc = MssqlInitialReadUtil.selectOcFieldName(db, configuredAirbyteStream);

assertEquals(primaryKey.getFirst().getFirst(), oc.orElse("No oc"));

}

@Test
void testNonClusteredIndex() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());

AirbyteStream stream = new AirbyteStream().withName(
actual.getStreams().getFirst().getName()).withNamespace(actual.getStreams().getFirst().getNamespace())
.withSourceDefinedPrimaryKey(actual.getStreams().getFirst().getSourceDefinedPrimaryKey());

ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withSyncMode(
SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(stream);

Optional<String> oc = MssqlInitialReadUtil.selectOcFieldName(db, configuredAirbyteStream);
final List<List<String>> primaryKey = configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey();

assertEquals(primaryKey.getFirst().getFirst(), oc.orElse("No oc"));

}

@Test
void testNonClusteredIndexNoPK() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);")
.with("CREATE NONCLUSTERED INDEX n1 ON id_and_name (name)");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());

AirbyteStream stream = new AirbyteStream().withName(
actual.getStreams().getFirst().getName()).withNamespace(actual.getStreams().getFirst().getNamespace());

ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withSyncMode(
SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(stream);

Optional<String> oc = MssqlInitialReadUtil.selectOcFieldName(db, configuredAirbyteStream);

assert (oc.isEmpty());

}

}
3 changes: 2 additions & 1 deletion docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.22 | 2025-02-10 | [53217](https://github.com/airbytehq/airbyte/pull/53217) | Default to PK when the clustered index is composite. |
| 4.1.21 | 2025-02-21 | [54189](https://github.com/airbytehq/airbyte/pull/54189) | Print state data only in the debugging log. |
| 4.1.20 | 2025-01-26 | [52556](https://github.com/airbytehq/airbyte/pull/52556) | Improve tables discovery during read. |
| 4.1.19 | 2025-01-16 | [51596](https://github.com/airbytehq/airbyte/pull/51596) | Bump driver versions to latest (jdbc, debezium, cdk) |
Expand Down

0 comments on commit 8bd3607

Please sign in to comment.