diff --git a/core/src/domain/resource_manager.rs b/core/src/domain/resource_manager.rs index ad6d1be2421a..d3bf7087030f 100644 --- a/core/src/domain/resource_manager.rs +++ b/core/src/domain/resource_manager.rs @@ -269,6 +269,19 @@ impl ResourceManager { resource_type: &str, resource_ids: Vec, ) -> Result<()> { + // During backfill we skip per-record fan-out. For models like + // content_identity each UUID triggers 2 DB queries via dependency + // routing, which dominates apply time on large libraries. The backfill + // coordinator emits a single coarse invalidation after the scope ends. + if crate::infra::sync::is_in_backfill() { + tracing::trace!( + resource_type = %resource_type, + count = resource_ids.len(), + "Skipping per-record resource event emission during backfill" + ); + return Ok(()); + } + // For now, delegate to single-resource handler // In future, could optimize by batching virtual resource construction self.emit_resource_events(resource_type, resource_ids).await diff --git a/core/src/infra/db/entities/content_identity.rs b/core/src/infra/db/entities/content_identity.rs index 7ccb7236cf04..4b0d9ffd09ca 100644 --- a/core/src/infra/db/entities/content_identity.rs +++ b/core/src/infra/db/entities/content_identity.rs @@ -246,13 +246,15 @@ impl Syncable for Model { let results = query.all(db).await?; - let mut sync_results = Vec::new(); + let mut sync_results: Vec<(Uuid, serde_json::Value, chrono::DateTime)> = + Vec::with_capacity(results.len()); for content in results { - if content.uuid.is_none() { - continue; - } + let uuid = match content.uuid { + Some(u) => u, + None => continue, + }; - let mut json = match content.to_sync_json() { + let json = match content.to_sync_json() { Ok(j) => j, Err(e) => { tracing::warn!(error = %e, content_hash = %content.content_hash, "Failed to serialize content_identity for sync"); @@ -260,22 +262,61 @@ impl Syncable for Model { } }; - // Convert FK to UUID for cross-device compatibility - for fk in Self::foreign_key_mappings() { - if let Err(e) = - crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await + sync_results.push((uuid, json, content.last_verified_at)); + } + + // Batch FK → UUID conversion across the whole batch: one DB round trip + // per FK type instead of one per (record × FK). Records that fail + // resolution are dropped so peers never see a sender-local int in the + // payload. + let fk_mappings = Self::foreign_key_mappings(); + if !fk_mappings.is_empty() && !sync_results.is_empty() { + let mut payloads: Vec = sync_results + .iter() + .map(|(_, json, _)| json.clone()) + .collect(); + let mut failed_indices: std::collections::HashSet = + std::collections::HashSet::new(); + + for fk in &fk_mappings { + match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch( + &mut payloads, + fk, + db, + ) + .await { - tracing::warn!( - error = %e, - uuid = %content.uuid.unwrap(), - fk_field = fk.local_field, - "Failed to convert FK to UUID, skipping content_identity" - ); - continue; + Ok(failed) => failed_indices.extend(failed), + Err(e) => { + tracing::warn!( + error = %e, + fk_field = fk.local_field, + "Batch FK conversion failed for content_identity" + ); + return Err(sea_orm::DbErr::Custom(format!( + "ContentIdentity FK batch conversion failed: {}", + e + ))); + } } } - sync_results.push((content.uuid.unwrap(), json, content.last_verified_at)); + sync_results = sync_results + .into_iter() + .zip(payloads.into_iter()) + .enumerate() + .filter_map(|(idx, ((uuid, _, ts), resolved))| { + if failed_indices.contains(&idx) { + tracing::warn!( + uuid = %uuid, + "Dropping content_identity with unresolved FK from sync batch" + ); + None + } else { + Some((uuid, resolved, ts)) + } + }) + .collect(); } Ok(sync_results) diff --git a/core/src/infra/db/entities/entry.rs b/core/src/infra/db/entities/entry.rs index e931a47b1bf5..66c12b6134b0 100644 --- a/core/src/infra/db/entities/entry.rs +++ b/core/src/infra/db/entities/entry.rs @@ -292,16 +292,31 @@ impl crate::infra::sync::Syncable for Model { std::collections::HashMap::new() }; - // Convert to sync format with FK mapping - let mut sync_results = Vec::new(); + // Serialize each row to JSON with its UUID and timestamp. + let mut staged: Vec<(Uuid, serde_json::Value, chrono::DateTime)> = + Vec::with_capacity(results.len()); for entry in results { let uuid = match entry.uuid { Some(u) => u, - None => continue, // Skip entries without UUIDs + None => continue, + }; + + // The sync cursor filters/orders on `indexed_at`, so a row with a + // NULL `indexed_at` would emit a cursor the next query predicate + // doesn't represent. Skip them — the indexed_at backfill migration + // populated every existing row, so this should be unreachable. + let indexed_at = match entry.indexed_at { + Some(ts) => ts, + None => { + tracing::warn!( + uuid = %uuid, + "Entry has NULL indexed_at; skipping sync until it's populated" + ); + continue; + } }; - // Serialize to JSON let mut json = match entry.to_sync_json() { Ok(j) => j, Err(e) => { @@ -311,9 +326,8 @@ impl crate::infra::sync::Syncable for Model { }; // For directories, include the absolute path from directory_paths - // This ensures receiving devices get identical paths for universal addressing + // so receiving devices get identical paths for universal addressing. if entry.kind == 1 { - // Directory if let Some(path) = directory_paths_map.get(&entry.id) { if let Some(obj) = json.as_object_mut() { obj.insert( @@ -324,27 +338,62 @@ impl crate::infra::sync::Syncable for Model { } } - // Convert FK integer IDs to UUIDs - for fk in ::foreign_key_mappings() { - if let Err(e) = - crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await + staged.push((uuid, json, indexed_at)); + } + + // Batch-convert FK integer IDs to UUIDs one FK type at a time across + // the whole batch — single DB round trip per FK, not per record × FK. + // Any record that fails resolution (missing target, bad value) is + // dropped before we return so peers never see a sender-local int. + let fk_mappings = ::foreign_key_mappings(); + if !fk_mappings.is_empty() && !staged.is_empty() { + let mut payloads: Vec = + staged.iter().map(|(_, json, _)| json.clone()).collect(); + let mut failed_indices: std::collections::HashSet = + std::collections::HashSet::new(); + + for fk in &fk_mappings { + match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch( + &mut payloads, + fk, + db, + ) + .await { - tracing::warn!( - error = %e, - uuid = %uuid, - fk_field = fk.local_field, - "Failed to convert FK to UUID, skipping entry" - ); - continue; + Ok(failed) => failed_indices.extend(failed), + Err(e) => { + tracing::warn!( + error = %e, + fk_field = fk.local_field, + "Batch FK conversion failed for entries" + ); + return Err(sea_orm::DbErr::Custom(format!( + "Entry FK batch conversion failed: {}", + e + ))); + } } } - // Use indexed_at for checkpoint/watermark tracking, fallback to modified_at if NULL - let timestamp = entry.indexed_at.unwrap_or(entry.modified_at); - sync_results.push((uuid, json, timestamp)); + staged = staged + .into_iter() + .zip(payloads.into_iter()) + .enumerate() + .filter_map(|(idx, ((uuid, _, ts), resolved))| { + if failed_indices.contains(&idx) { + tracing::warn!( + uuid = %uuid, + "Dropping entry with unresolved FK from sync batch" + ); + None + } else { + Some((uuid, resolved, ts)) + } + }) + .collect(); } - Ok(sync_results) + Ok(staged) } /// Apply state change - already implemented in Model impl block below @@ -564,10 +613,12 @@ impl Model { inserted.id }; - // Rebuild entry_closure for this synced entry - // Without this, the entry only has a self-reference and cannot be queried - // for descendants, breaking subtree operations, location scoping, etc. - Self::rebuild_entry_closure(entry_id, parent_id, db).await?; + // Rebuild entry_closure for this synced entry, unless we're inside a + // backfill apply loop — in that case the post_backfill_rebuild hook + // does a single bulk rebuild at the end, so per-entry work is wasted. + if !crate::infra::sync::is_in_backfill() { + Self::rebuild_entry_closure(entry_id, parent_id, db).await?; + } // If this is a directory, create or update its entry in the directory_paths table if EntryKind::from(kind) == EntryKind::Directory { diff --git a/core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs b/core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs new file mode 100644 index 000000000000..f7ca5a71f75c --- /dev/null +++ b/core/src/infra/db/migration/m20260417_000001_add_entries_sync_cursor_index.rs @@ -0,0 +1,36 @@ +//! Add composite index on entries(indexed_at, uuid) to back the device-owned +//! sync cursor. +//! +//! `Entry::query_for_sync` paginates by `ORDER BY indexed_at ASC, uuid ASC` +//! with a tie-breaker filter of the same shape. Without this index, SQLite +//! does a full table scan per batch request — O(N) per batch, O(N^2) across +//! an initial backfill of a large library. + +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + "CREATE INDEX IF NOT EXISTS idx_entries_indexed_at_uuid \ + ON entries(indexed_at, uuid)", + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared("DROP INDEX IF EXISTS idx_entries_indexed_at_uuid") + .await?; + + Ok(()) + } +} diff --git a/core/src/infra/db/migration/mod.rs b/core/src/infra/db/migration/mod.rs index 1a301e8d93f8..6dea2a1c3597 100644 --- a/core/src/infra/db/migration/mod.rs +++ b/core/src/infra/db/migration/mod.rs @@ -38,6 +38,7 @@ mod m20260104_000001_replace_device_id_with_volume_id; mod m20260105_000001_add_volume_id_to_locations; mod m20260114_000001_fix_search_index_include_directories; mod m20260123_000001_remove_legacy_sync_columns; +mod m20260417_000001_add_entries_sync_cursor_index; pub struct Migrator; @@ -81,6 +82,7 @@ impl MigratorTrait for Migrator { Box::new(m20260105_000001_add_volume_id_to_locations::Migration), Box::new(m20260114_000001_fix_search_index_include_directories::Migration), Box::new(m20260123_000001_remove_legacy_sync_columns::Migration), + Box::new(m20260417_000001_add_entries_sync_cursor_index::Migration), ] } } diff --git a/core/src/infra/sync/backfill_context.rs b/core/src/infra/sync/backfill_context.rs new file mode 100644 index 000000000000..c64bf897d50c --- /dev/null +++ b/core/src/infra/sync/backfill_context.rs @@ -0,0 +1,26 @@ +//! Task-local flag identifying code running inside a backfill apply loop. +//! +//! A few per-record hooks (closure table rebuild, resource event emission) +//! are redundant during backfill because the coordinator does bulk work at +//! the end. Models check this flag to skip that per-record work. + +tokio::task_local! { + static IN_BACKFILL: (); +} + +/// Run `fut` with the in-backfill flag set. Nested scopes are allowed. +pub async fn in_backfill(fut: F) -> T +where + F: std::future::Future, +{ + if is_in_backfill() { + fut.await + } else { + IN_BACKFILL.scope((), fut).await + } +} + +/// True when the current task is inside an `in_backfill` scope. +pub fn is_in_backfill() -> bool { + IN_BACKFILL.try_with(|_| ()).is_ok() +} diff --git a/core/src/infra/sync/fk_mapper.rs b/core/src/infra/sync/fk_mapper.rs index e74f6b225228..840df1a2332c 100644 --- a/core/src/infra/sync/fk_mapper.rs +++ b/core/src/infra/sync/fk_mapper.rs @@ -102,6 +102,112 @@ pub async fn convert_fk_to_uuid( Ok(()) } +/// Batch convert local integer FKs to UUIDs across multiple records. +/// +/// Same per-record semantics as [`convert_fk_to_uuid`] but one DB round trip +/// per FK type instead of one per (record × FK). Records whose target could +/// not be resolved (missing target row, non-integer value) are reported by +/// index — callers MUST drop those records rather than ship them. Silently +/// leaving `local_field` in place would let the receiver's +/// `map_sync_json_to_local` interpret the sender-local integer as +/// already-resolved and write it straight into the receiver's DB, corrupting +/// FKs across devices. +/// +/// Successful records are mutated in place (`local_field` removed, +/// `uuid_field` added). The indices of failed records are returned. +pub async fn convert_fks_to_uuids_batch( + records: &mut [Value], + fk: &FKMapping, + db: &DatabaseConnection, +) -> Result> { + let mut failed: HashSet = HashSet::new(); + if records.is_empty() { + return Ok(failed); + } + + let uuid_field = fk.uuid_field_name(); + + // First pass: collect IDs and flag records that can't be resolved. An + // absent `local_field` is treated as failed (matches `convert_fk_to_uuid` + // semantics — only an explicit JSON `null` is a legitimate null FK). + let mut ids_to_lookup: HashSet = HashSet::new(); + for (idx, json) in records.iter().enumerate() { + match json.get(fk.local_field) { + None => { + tracing::warn!( + fk_field = fk.local_field, + "FK field missing in sync payload; dropping record" + ); + failed.insert(idx); + } + Some(v) if v.is_null() => { /* explicit null — treated as null below */ } + Some(v) => match v.as_i64() { + Some(id) => { + ids_to_lookup.insert(id as i32); + } + None => { + tracing::warn!( + fk_field = fk.local_field, + value = %v, + "FK field has non-integer value in sync payload; dropping record" + ); + failed.insert(idx); + } + }, + } + } + + let id_to_uuid = if ids_to_lookup.is_empty() { + HashMap::new() + } else { + batch_lookup_uuids_for_local_ids(fk.target_table, ids_to_lookup, db).await? + }; + + for (idx, json) in records.iter_mut().enumerate() { + if failed.contains(&idx) { + continue; + } + + let local_field_value = json.get(fk.local_field).cloned(); + + match local_field_value { + None => { + // First pass already flagged this as failed; nothing to do. + continue; + } + Some(v) if v.is_null() => { + json[&uuid_field] = Value::Null; + if let Some(obj) = json.as_object_mut() { + obj.remove(fk.local_field); + } + } + Some(v) => { + // Unwrap is safe: first pass validated i64 or flagged as failed. + let id = v.as_i64().expect("validated in first pass") as i32; + match id_to_uuid.get(&id) { + Some(uuid) => { + json[&uuid_field] = json!(uuid.to_string()); + if let Some(obj) = json.as_object_mut() { + obj.remove(fk.local_field); + } + } + None => { + tracing::warn!( + fk_field = fk.local_field, + target_table = fk.target_table, + id = id, + "FK target not found during batch conversion; dropping record" + ); + failed.insert(idx); + } + } + } + } + } + + Ok(failed) +} + /// Look up UUID for a local integer ID via the registry async fn lookup_uuid_for_local_id( table: &str, diff --git a/core/src/infra/sync/mod.rs b/core/src/infra/sync/mod.rs index 6f0ac461ad30..3ed967061b4a 100644 --- a/core/src/infra/sync/mod.rs +++ b/core/src/infra/sync/mod.rs @@ -10,6 +10,7 @@ //! - Checkpoint persistence for resumable backfill //! +pub mod backfill_context; pub mod checkpoints; pub mod config; pub mod dependency_graph; @@ -27,6 +28,7 @@ pub mod transaction; pub mod transport; pub mod watermarks; +pub use backfill_context::{in_backfill, is_in_backfill}; pub use checkpoints::{BackfillCheckpoint, BackfillCheckpointStore, CheckpointError}; pub use config::{ BatchingConfig, MonitoringConfig, NetworkConfig, PruningStrategy, RetentionConfig, SyncConfig, @@ -42,8 +44,8 @@ pub use event_log::{ SyncEventLogger, SyncEventQuery, SyncEventType, }; pub use fk_mapper::{ - batch_map_sync_json_to_local, convert_fk_to_uuid, map_sync_json_to_local, BatchFkMapResult, - FKMapping, + batch_map_sync_json_to_local, convert_fk_to_uuid, convert_fks_to_uuids_batch, + map_sync_json_to_local, BatchFkMapResult, FKMapping, }; pub use hlc::{HLCGenerator, HLC}; pub use peer_log::{ChangeType, PeerLog, PeerLogError, SharedChangeEntry}; diff --git a/core/src/infra/sync/peer_log.rs b/core/src/infra/sync/peer_log.rs index 3563a2789436..9934930d5261 100644 --- a/core/src/infra/sync/peer_log.rs +++ b/core/src/infra/sync/peer_log.rs @@ -211,21 +211,36 @@ impl PeerLog { Ok(()) } - /// Get all changes since a given HLC + /// Get changes since a given HLC, up to an optional limit + /// + /// Passing `limit = Some(N)` pushes `LIMIT N` into SQL so the sender never + /// materializes the whole log when only a batch is needed. `None` returns + /// every row (used by tests and bulk maintenance paths only). pub async fn get_since( &self, since: Option, + limit: Option, ) -> Result, PeerLogError> { - let query = match since { - Some(hlc) => { - let hlc_str = hlc.to_string(); - Statement::from_sql_and_values( - DbBackend::Sqlite, - "SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes WHERE hlc > ? ORDER BY hlc ASC", - vec![hlc_str.into()], - ) - } - None => Statement::from_string( + // Clamp to i64::MAX so a ludicrously large usize never wraps into a + // negative value when SQLite binds the parameter. + let sql_limit = limit.map(|lim| i64::try_from(lim).unwrap_or(i64::MAX)); + let query = match (since, sql_limit) { + (Some(hlc), Some(lim)) => Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes WHERE hlc > ? ORDER BY hlc ASC LIMIT ?", + vec![hlc.to_string().into(), lim.into()], + ), + (Some(hlc), None) => Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes WHERE hlc > ? ORDER BY hlc ASC", + vec![hlc.to_string().into()], + ), + (None, Some(lim)) => Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes ORDER BY hlc ASC LIMIT ?", + vec![lim.into()], + ), + (None, None) => Statement::from_string( DbBackend::Sqlite, "SELECT hlc, model_type, record_uuid, change_type, data FROM shared_changes ORDER BY hlc ASC".to_string(), ), @@ -533,7 +548,7 @@ mod tests { peer_log.append(entry.clone()).await.unwrap(); - let entries = peer_log.get_since(None).await.unwrap(); + let entries = peer_log.get_since(None, None).await.unwrap(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].model_type, "tag"); } @@ -556,7 +571,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } - let entries = peer_log.get_since(None).await.unwrap(); + let entries = peer_log.get_since(None, None).await.unwrap(); assert_eq!(entries.len(), 3); // Peer A acks first 2 @@ -571,7 +586,7 @@ mod tests { let pruned = peer_log.prune_acked().await.unwrap(); assert_eq!(pruned, 2); - let remaining = peer_log.get_since(None).await.unwrap(); + let remaining = peer_log.get_since(None, None).await.unwrap(); assert_eq!(remaining.len(), 1); } } diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index 803c8d3b62dc..6c603f96928b 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -209,14 +209,22 @@ impl BackfillManager { ); } - // Phase 2: Backfill shared resources FIRST (entries depend on content_identities) - let max_shared_hlc = self.backfill_shared_resources(selected_peer).await?; + // Phases 2 + 3 run inside the in_backfill scope so per-record hooks + // (entry_closure rebuild, resource event emission) can short-circuit + // — the post_backfill_rebuild pass below handles the bulk work. + let (max_shared_hlc, final_state_checkpoint) = crate::infra::sync::in_backfill(async { + // Phase 2: Backfill shared resources FIRST (entries depend on content_identities) + let max_shared_hlc = self.backfill_shared_resources(selected_peer).await?; + + // Phase 3: Backfill device-owned state (after shared dependencies exist) + // For initial backfill, don't use watermark (get everything) + let final_state_checkpoint = self + .backfill_device_owned_state(selected_peer, None) + .await?; - // Phase 3: Backfill device-owned state (after shared dependencies exist) - // For initial backfill, don't use watermark (get everything) - let final_state_checkpoint = self - .backfill_device_owned_state(selected_peer, None) - .await?; + Ok::<_, anyhow::Error>((max_shared_hlc, final_state_checkpoint)) + }) + .await?; // Phase 3.5: Run post-backfill rebuilds via registry (polymorphic) // Models that registered post_backfill_rebuild will have their derived tables rebuilt @@ -230,6 +238,13 @@ impl BackfillManager { // Don't fail backfill, just warn } + // Coarse UI invalidation: per-record resource events were suppressed + // during backfill to avoid O(N) fan-out queries. One Refresh tells the + // frontend to drop any cached views that may now be stale. + self.peer_sync + .event_bus() + .emit(crate::infra::event::Event::Refresh); + // Phase 4: Transition to ready (processes buffer) self.peer_sync.transition_to_ready().await?; @@ -537,65 +552,135 @@ impl BackfillManager { let fk_mappings = crate::infra::sync::get_fk_mappings(&model_type).unwrap_or_default(); - // For hierarchical models (entries), process one at a time to handle parent→child ordering - // Batch FK resolution fails because children can't find parents that haven't been inserted yet + // For hierarchical models (entries), self-referential FKs (parent_id) + // must be resolved per-entry so each child sees its just-inserted + // parent. Everything else (content_id, metadata_id, volume_id, + // device_id, ...) is independent of insertion order, so we batch + // those across the whole batch first — one DB round trip per FK + // type instead of per (record × FK). let processed_data = if model_type == "entry" && !fk_mappings.is_empty() { - // Process entries individually: resolve FK → insert → resolve deps → next - let mut succeeded = Vec::with_capacity(record_data.len()); - - for data in record_data { - // Try to resolve FKs for this single record + let table_name = crate::infra::sync::get_table_name(&model_type).await; + let (self_ref_mappings, non_self_mappings): (Vec<_>, Vec<_>) = fk_mappings + .iter() + .cloned() + .partition(|fk| Some(fk.target_table) == table_name); + + // Batch-resolve non-self FKs across the whole batch. + let pre_resolved = if !non_self_mappings.is_empty() { let result = crate::infra::sync::batch_map_sync_json_to_local( - vec![data.clone()], - fk_mappings.clone(), + record_data, + non_self_mappings, &db, ) .await - .map_err(|e| anyhow::anyhow!("FK mapping failed: {}", e))?; - - if !result.succeeded.is_empty() { - // FK resolution succeeded - add to processing list - succeeded.extend(result.succeeded); - } else if !result.failed.is_empty() { - // FK resolution failed - add to dependency tracker - for (failed_data, fk_field, missing_uuid) in result.failed { - let record_uuid = failed_data - .get("uuid") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - - let record_timestamp = records - .iter() - .find(|r| Some(r.uuid) == record_uuid) - .map(|r| r.timestamp) - .unwrap_or_else(chrono::Utc::now); - - if let Some(uuid) = record_uuid { - tracing::debug!( - model_type = %model_type, - record_uuid = %uuid, - fk_field = %fk_field, - missing_uuid = %missing_uuid, - "Entry has missing parent - adding to dependency tracker" - ); + .map_err(|e| { + anyhow::anyhow!("Batch FK mapping failed for entries: {}", e) + })?; - let state_change = super::state::StateChangeMessage { - model_type: model_type.clone(), - record_uuid: uuid, - device_id: source_device_id, - data: failed_data, - timestamp: record_timestamp, - }; + for (failed_data, fk_field, missing_uuid) in result.failed { + let record_uuid = failed_data + .get("uuid") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + + let record_timestamp = records + .iter() + .find(|r| Some(r.uuid) == record_uuid) + .map(|r| r.timestamp) + .unwrap_or_else(chrono::Utc::now); + + if let Some(uuid) = record_uuid { + tracing::debug!( + model_type = %model_type, + record_uuid = %uuid, + fk_field = %fk_field, + missing_uuid = %missing_uuid, + "Entry has missing non-self FK - adding to dependency tracker" + ); + + let state_change = super::state::StateChangeMessage { + model_type: model_type.clone(), + record_uuid: uuid, + device_id: source_device_id, + data: failed_data, + timestamp: record_timestamp, + }; + + self.peer_sync + .dependency_tracker() + .add_dependency( + missing_uuid, + super::state::BufferedUpdate::StateChange(state_change), + ) + .await; + } + } + + result.succeeded + } else { + record_data + }; + + // Resolve self-referential FKs (parent_id) per-entry so children + // can find their just-inserted parents. + let mut succeeded = Vec::with_capacity(pre_resolved.len()); - self.peer_sync - .dependency_tracker() - .add_dependency( - missing_uuid, - super::state::BufferedUpdate::StateChange( - state_change, - ), - ) - .await; + if self_ref_mappings.is_empty() { + succeeded.extend(pre_resolved); + } else { + for data in pre_resolved { + let result = crate::infra::sync::batch_map_sync_json_to_local( + vec![data.clone()], + self_ref_mappings.clone(), + &db, + ) + .await + .map_err(|e| { + anyhow::anyhow!("Self-referential FK mapping failed: {}", e) + })?; + + if !result.succeeded.is_empty() { + succeeded.extend(result.succeeded); + } else if !result.failed.is_empty() { + for (failed_data, fk_field, missing_uuid) in result.failed { + let record_uuid = failed_data + .get("uuid") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + + let record_timestamp = records + .iter() + .find(|r| Some(r.uuid) == record_uuid) + .map(|r| r.timestamp) + .unwrap_or_else(chrono::Utc::now); + + if let Some(uuid) = record_uuid { + tracing::debug!( + model_type = %model_type, + record_uuid = %uuid, + fk_field = %fk_field, + missing_uuid = %missing_uuid, + "Entry has missing parent - adding to dependency tracker" + ); + + let state_change = super::state::StateChangeMessage { + model_type: model_type.clone(), + record_uuid: uuid, + device_id: source_device_id, + data: failed_data, + timestamp: record_timestamp, + }; + + self.peer_sync + .dependency_tracker() + .add_dependency( + missing_uuid, + super::state::BufferedUpdate::StateChange( + state_change, + ), + ) + .await; + } } } } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 57a03789795a..547d44bc8fff 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -2715,14 +2715,21 @@ impl PeerSync { "Querying shared changes from peer log" ); - // Query peer log (get all since HLC, then limit in memory) + // limit == 0 with our `len > limit` rule would return 0 rows plus + // has_more = true, so a naive caller would spin forever. + if limit == 0 { + return Err(anyhow::anyhow!("shared changes limit must be > 0")); + } + + // Query peer log with SQL-side LIMIT, fetching one extra row to detect has_more + let fetch_limit = limit.saturating_add(1); let mut entries = self .peer_log - .get_since(since_hlc) + .get_since(since_hlc, Some(fetch_limit)) .await .map_err(|e| anyhow::anyhow!("Failed to query peer log: {}", e))?; - // Check if there are more entries beyond the limit + // If we got the extra row, there are more entries beyond this batch let has_more = entries.len() > limit; // Truncate to limit diff --git a/core/src/service/sync/protocol_handler.rs b/core/src/service/sync/protocol_handler.rs index 2e04fbf373f4..2df3c824b178 100644 --- a/core/src/service/sync/protocol_handler.rs +++ b/core/src/service/sync/protocol_handler.rs @@ -114,11 +114,25 @@ impl LogSyncHandler { since_hlc: Option, limit: usize, ) -> Result { - // Get changes from our peer log - let entries = self.peer_sync.peer_log.get_since(since_hlc).await?; + // Reject limit == 0 up front: with our `has_more = entries.len() > limit` + // rule it would return 0 rows with has_more = true and the caller would + // spin forever reissuing the same request. + if limit == 0 { + anyhow::bail!("SharedChangeRequest limit must be > 0"); + } - let has_more = entries.len() >= limit; - let limited: Vec<_> = entries.into_iter().take(limit).collect(); + // Get changes from our peer log, fetching one extra row so we can derive has_more + // without reloading the entire log on every batch request. + let fetch_limit = limit.saturating_add(1); + let mut entries = self + .peer_sync + .peer_log + .get_since(since_hlc, Some(fetch_limit)) + .await?; + + let has_more = entries.len() > limit; + entries.truncate(limit); + let limited = entries; // For initial sync (no watermark), always include current state // This ensures shared resources like content_identities are available