fix(source-postgres): use REPEATABLE READ snapshot for CTID batch queries to prevent silent row drops#74062
Conversation
…ries Previously, each CTID batch query in InitialSyncCtidIterator opened a new database connection via database.unsafeQuery(), which meant each batch got its own MVCC snapshot under PostgreSQL's default READ COMMITTED isolation. When rows were updated by concurrent transactions during a multi-batch scan, the old CTID became dead in later snapshots and the new CTID could land in a range already scanned — causing rows to be silently dropped. This fix opens a single dedicated connection with REPEATABLE READ isolation at the start of the CTID scan and reuses it for all batch queries. This ensures all batches see the same consistent snapshot, eliminating the window where concurrent updates can cause rows to be missed. Key changes: - Add getDataSource() to DefaultJdbcDatabase for connection access - Open a REPEATABLE READ snapshot connection in initSubQueries() - Reuse the snapshot connection for all CTID batch queries - Properly close and reopen on VACUUM resets - Close the snapshot connection on iterator close Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksPR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful Resources
|
|
- Change dataSource from protected to public in DefaultJdbcDatabase.kt (Kotlin auto-generates getDataSource() from val property; explicit getDataSource() method caused JVM signature clash) - Apply Google Java Format conventions to InitialSyncCtidIterator.java (line wrapping, Javadoc formatting) Co-Authored-By: bot_apk <apk@cognition.ai>
Version bump for CTID snapshot isolation fix. Uses -rc.1 suffix for progressive rollout. Co-Authored-By: bot_apk <apk@cognition.ai>
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Deploy preview for airbyte-docs ready! ✅ Preview Built with commit 27f2f6f. |
…pply) Co-Authored-By: bot_apk <apk@cognition.ai>
…otected field Thread DataSource through the constructor chain: PostgresSource → CtidUtils.createInitialLoader → PostgresCtidHandler → InitialSyncCtidIterator This avoids the CDK dependency issue where the connector CI builds against published CDK artifacts that don't have the getDataSource() method public. Co-Authored-By: bot_apk <apk@cognition.ai>
The DataSource is now threaded via constructor params, so the CDK visibility change is no longer needed. Co-Authored-By: bot_apk <apk@cognition.ai>
|
|
↪️ Triggering Reason: Draft PR with CI passing (36/36 checks, 331 tests passed). Fix uses REPEATABLE READ snapshot isolation for CTID batch queries to prevent silent row drops during concurrent writes. |
Fix Validation EvidenceOutcome: Could not Run Tests (live connection testing blocked on approval; regression tests passed) Evidence SummaryRegression tests ran the pre-release version ( Live connection testing on 2 qualified internal canary connections was blocked because the Next Steps
Connector & PR DetailsConnector: Evidence PlanProving Criteria
Disproving Criteria
Cases Attempted
Pre-flight Checks
Design Intent Note: The current behavior (READ COMMITTED per batch) is clearly unintentional. No existing code uses REPEATABLE READ or snapshot isolation for CTID scans. Detailed Evidence LogRegression Tests (2026-02-26T15:10–15:17 UTC)
Workflow: https://github.com/airbytehq/airbyte-ops-mcp/actions/runs/22448037363 Live Connection TestsNot executed — blocked on approval comment from |
|
What
Resolves https://github.com/airbytehq/oncall/issues/11440:
The Postgres source connector's CTID-based full refresh scan silently drops rows that are updated by concurrent transactions during multi-batch scans. Each batch query previously opened a new connection (and therefore a new MVCC snapshot) via
database.unsafeQuery(). Under PostgreSQL's defaultREAD COMMITTEDisolation, a row updated mid-scan could become invisible to all batches: the old CTID is dead in later snapshots, and the new CTID may fall in a range already scanned.How
Core fix:
InitialSyncCtidIteratornow opens a single dedicated connection withREPEATABLE READisolation before the first batch, and reuses it for all subsequent batch queries. This guarantees all batches see the same consistent MVCC snapshot.DataSource threading: The
DataSourceis passed explicitly as a constructor parameter through the chain:PostgresSource→CtidUtils.createInitialLoader→PostgresCtidHandler→InitialSyncCtidIterator. The iterator usesdataSource.getConnection()directly to obtain the snapshot connection, avoiding any need to cast or access CDK internals.CDK change:⚠️ The diff also contains a one-line CDK visibility change (
protected val dataSource→val dataSourceinDefaultJdbcDatabase.kt) from an earlier commit. This change is now redundant since the DataSource is threaded via constructor params instead. Reviewer should decide whether to keep or revert this.VACUUM handling: On VACUUM-triggered resets (
resetSubQueries), the snapshot connection is closed and a fresh one is opened, since VACUUM invalidates the physical page layout anyway.Streaming: A custom
toStream()replacesdatabase.unsafeQuery()to streamResultSetrows without closing the shared connection on stream completion. A fixed fetch size of 10,000 is used.Version: Bumped to
3.7.3-rc.1(PATCH, with-rc.1suffix for progressive rollout).Review guide
DefaultJdbcDatabase.kt—dataSourcepublic. This was part of an earlier approach but is no longer needed since DataSource is now threaded via constructor params. Consider reverting this change to avoid expanding CDK API surface unnecessarily.PostgresSource.java— Stores the most recently createdDataSourceincurrentDataSourcefield and passes it to allcreateInitialLoadercall sites (3 locations). Note: This field is not synchronized; ifcreateDatabase()is called concurrently, race conditions could occur (though unlikely in practice).CtidUtils.java— AddsDataSourceparameter tocreateInitialLoader()and passes it toPostgresCtidHandlerconstructor.PostgresCtidHandler.java— AddsDataSourcefield and constructor parameter, passes it toInitialSyncCtidIteratorconstructor.PostgresCdcCtidInitializer.java— AddsDataSourceparameter tocdcCtidIteratorsCombined()and passes it tocreateInitialLoader().InitialSyncCtidIterator.java— The main fix. Key areas:openSnapshotConnection()/closeSnapshotConnection()— Connection lifecycle management. UsesdataSource.getConnection()directly (no moreinstanceofcast).getStream()— Now uses the sharedsnapshotConnectiondirectly instead ofdatabase.unsafeQuery().toStream()— Custom spliterator. Note:PreparedStatements are not explicitly closed per-batch; they are cleaned up when the snapshot connection closes. Verify this is acceptable for scans with many batches (potential memory accumulation).SNAPSHOT_FETCH_SIZE = 10_000— Replaces adaptive fetch size tuning fromAdaptiveStreamingQueryConfig. For tables with very wide rows this could increase memory usage vs. the previous adaptive approach.initSubQueries()andresetSubQueries()— Snapshot connection is opened at scan start and reopened on VACUUM resets. Both now throwSQLException.close()— Properly closes both the current iterator and the snapshot connection.metadata.yaml/postgres.md— Version bump to3.7.3-rc.1and changelog entry.DefaultJdbcDatabase.ktchange is no longer needed. Should it be reverted to avoid expanding CDK API surface?PostgresSource.currentDataSourceis not synchronized. Verify thatcreateDatabase()is never called concurrently in production.User Impact
Positive: Eliminates silent data loss for full refresh syncs on tables with concurrent writes. Rows updated during the scan will no longer be dropped.
Negative:
REPEATABLE READtransaction is held open for the entire CTID scan (potentially minutes to hours for large tables). This prevents PostgreSQL from VACUUMing dead tuples created during the scan, which could cause table bloat if scans are frequent and long-running. This is a known trade-off for correctness.Can this PR be safely reverted and rolled back?
The change is isolated to CTID scanning logic. Reverting would restore the previous behavior (separate snapshots per batch, with the silent row drop bug).
Session: https://app.devin.ai/sessions/be8cc3db060a4bc3b6e8d4b5b271879a
Requested by: bot_apk (apk@cognition.ai)