Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions docs/tutorials/custom-implementations.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,16 @@ impl Destination for HttpDestination {
).await
}

/// Called when ETL has a batch of rows to send to the destination
/// This is the main data flow method - gets called frequently during replication
/// Writes a batch of table rows to the destination.
///
/// This method is used during initial table synchronization to bulk load existing
/// data. Rows are provided as [`TableRow`] instances with typed cell values.
/// Implementations should optimize batch insertion performance while maintaining
/// data consistency.
///
/// Note that this method will be called even if the source table has no data. In that case it
/// will supply an empty list of rows. This is done by design so that the destination can already
/// prepare the initial tables before starting streaming.
async fn write_table_rows(
&self,
table_id: TableId,
Expand Down Expand Up @@ -528,8 +536,24 @@ impl Destination for HttpDestination {
).await
}

/// Called when ETL has replication events to send (e.g., transaction markers)
/// These are metadata events about the replication process itself
/// Writes streaming replication events to the destination.
///
/// This method handles real-time changes from the Postgres replication stream.
/// Events include inserts, updates, deletes, and transaction boundaries.
///
/// Note that the events vector may contain events for multiple different tables.
/// While consistency must be maintained for events within the same table (processing them in order),
/// events for different tables can be applied in any order (e.g. concurrently).
///
/// The `events` vector contains `Event` enums which can be:
/// - `Event::Insert`: A new row was added
/// - `Event::Update`: An existing row was modified
/// - `Event::Delete`: A row was removed
/// - `Event::Begin` / `Event::Commit`: Transaction boundaries
///
/// Your implementation should handle these events appropriately. For example, you might
/// want to ignore `Commit` events if your destination doesn't support transactions (like BigQuery),
/// or use them to batch updates if it does.
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
// Skip if no events to process
if events.is_empty() {
Expand All @@ -542,6 +566,26 @@ impl Destination for HttpDestination {
let events_json: Vec<_> = events
.iter()
.map(|event| {
// Example of how to handle specific event types:
match event {
Event::Insert(insert) => {
// Handle insert: insert.table_row contains the new data
}
Event::Update(update) => {
// Handle update: update.table_row has new data
// update.old_table_row has previous data (if replica identity full)
}
Event::Delete(delete) => {
// Handle delete: delete.old_table_row identifies row to remove
}
Event::Begin(_) | Event::Commit(_) => {
// Transaction boundaries.
// Note: These events may be delivered multiple times since they don't
// carry table IDs. Destinations should check LSNs to avoid re-processing.
}
_ => {} // Handle other events (Relation, Truncate)
}

json!({
"event_type": format!("{:?}", event), // Convert event to string for demo
})
Expand Down