Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,13 @@ public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalo
.findFirst();

if (!streamOld.equals(streamNew)) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor,
getStreamDiff(streamOld, streamNew, stream)));
// getStreamDiff only checks for differences in the stream's field name or field type
// but there are a number of reasons the streams might be different (such as a source-defined
// primary key or cursor changing). These should not be expressed as "stream updates".
UpdateStreamTransform streamTransform = getStreamDiff(streamOld, streamNew, stream);
if (streamTransform.getFieldTransforms().size() > 0) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, streamTransform));
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,27 @@ void testCatalogDiffWithoutStreamConfig() throws IOException {
Assertions.assertThat(diff).containsAll(expectedDiff);
}

@Test
void testCatalogDiffStreamChangeWithNoFieldTransform() throws IOException {
final JsonNode schema1 = Jsons.deserialize(readResource(VALID_SCHEMA_JSON));

final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1),
new AirbyteStream().withName(SALES).withJsonSchema(schema1)));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1).withSourceDefinedPrimaryKey(List.of(List.of("id")))));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1))
.withSyncMode(SyncMode.FULL_REFRESH)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createRemoveStreamTransform(new StreamDescriptor().withName(SALES)))
.toList();

Assertions.assertThat(actualDiff).containsExactlyInAnyOrderElementsOf(expectedDiff);
}
}