Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/src/domain/resource_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ impl ResourceManager {
resource_type: &str,
resource_ids: Vec<Uuid>,
) -> Result<()> {
// During backfill we skip per-record fan-out. For models like
// content_identity each UUID triggers 2 DB queries via dependency
// routing, which dominates apply time on large libraries. The backfill
// coordinator emits a single coarse invalidation after the scope ends.
if crate::infra::sync::is_in_backfill() {
tracing::trace!(
resource_type = %resource_type,
count = resource_ids.len(),
"Skipping per-record resource event emission during backfill"
);
return Ok(());
}

// For now, delegate to single-resource handler
// In future, could optimize by batching virtual resource construction
self.emit_resource_events(resource_type, resource_ids).await
Expand Down
75 changes: 58 additions & 17 deletions core/src/infra/db/entities/content_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,77 @@ impl Syncable for Model {

let results = query.all(db).await?;

let mut sync_results = Vec::new();
let mut sync_results: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
Vec::with_capacity(results.len());
for content in results {
if content.uuid.is_none() {
continue;
}
let uuid = match content.uuid {
Some(u) => u,
None => continue,
};

let mut json = match content.to_sync_json() {
let json = match content.to_sync_json() {
Ok(j) => j,
Err(e) => {
tracing::warn!(error = %e, content_hash = %content.content_hash, "Failed to serialize content_identity for sync");
continue;
}
};

// Convert FK to UUID for cross-device compatibility
for fk in Self::foreign_key_mappings() {
if let Err(e) =
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
sync_results.push((uuid, json, content.last_verified_at));
}

// Batch FK → UUID conversion across the whole batch: one DB round trip
// per FK type instead of one per (record × FK). Records that fail
// resolution are dropped so peers never see a sender-local int in the
// payload.
let fk_mappings = Self::foreign_key_mappings();
if !fk_mappings.is_empty() && !sync_results.is_empty() {
let mut payloads: Vec<serde_json::Value> = sync_results
.iter()
.map(|(_, json, _)| json.clone())
.collect();
let mut failed_indices: std::collections::HashSet<usize> =
std::collections::HashSet::new();

for fk in &fk_mappings {
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
&mut payloads,
fk,
db,
)
.await
{
tracing::warn!(
error = %e,
uuid = %content.uuid.unwrap(),
fk_field = fk.local_field,
"Failed to convert FK to UUID, skipping content_identity"
);
continue;
Ok(failed) => failed_indices.extend(failed),
Err(e) => {
tracing::warn!(
error = %e,
fk_field = fk.local_field,
"Batch FK conversion failed for content_identity"
);
return Err(sea_orm::DbErr::Custom(format!(
"ContentIdentity FK batch conversion failed: {}",
e
)));
}
}
}

sync_results.push((content.uuid.unwrap(), json, content.last_verified_at));
sync_results = sync_results
.into_iter()
.zip(payloads.into_iter())
.enumerate()
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
if failed_indices.contains(&idx) {
tracing::warn!(
uuid = %uuid,
"Dropping content_identity with unresolved FK from sync batch"
);
None
} else {
Some((uuid, resolved, ts))
}
})
.collect();
Comment thread
jamiepine marked this conversation as resolved.
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Ok(sync_results)
Expand Down
101 changes: 76 additions & 25 deletions core/src/infra/db/entities/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,31 @@ impl crate::infra::sync::Syncable for Model {
std::collections::HashMap::new()
};

// Convert to sync format with FK mapping
let mut sync_results = Vec::new();
// Serialize each row to JSON with its UUID and timestamp.
let mut staged: Vec<(Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)> =
Vec::with_capacity(results.len());

for entry in results {
let uuid = match entry.uuid {
Some(u) => u,
None => continue, // Skip entries without UUIDs
None => continue,
};

// The sync cursor filters/orders on `indexed_at`, so a row with a
// NULL `indexed_at` would emit a cursor the next query predicate
// doesn't represent. Skip them — the indexed_at backfill migration
// populated every existing row, so this should be unreachable.
let indexed_at = match entry.indexed_at {
Some(ts) => ts,
None => {
tracing::warn!(
uuid = %uuid,
"Entry has NULL indexed_at; skipping sync until it's populated"
);
continue;
}
};

// Serialize to JSON
let mut json = match entry.to_sync_json() {
Ok(j) => j,
Err(e) => {
Expand All @@ -311,9 +326,8 @@ impl crate::infra::sync::Syncable for Model {
};

// For directories, include the absolute path from directory_paths
// This ensures receiving devices get identical paths for universal addressing
// so receiving devices get identical paths for universal addressing.
if entry.kind == 1 {
// Directory
if let Some(path) = directory_paths_map.get(&entry.id) {
if let Some(obj) = json.as_object_mut() {
obj.insert(
Expand All @@ -324,27 +338,62 @@ impl crate::infra::sync::Syncable for Model {
}
}

// Convert FK integer IDs to UUIDs
for fk in <Model as Syncable>::foreign_key_mappings() {
if let Err(e) =
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
staged.push((uuid, json, indexed_at));
}

// Batch-convert FK integer IDs to UUIDs one FK type at a time across
// the whole batch — single DB round trip per FK, not per record × FK.
// Any record that fails resolution (missing target, bad value) is
// dropped before we return so peers never see a sender-local int.
let fk_mappings = <Model as Syncable>::foreign_key_mappings();
if !fk_mappings.is_empty() && !staged.is_empty() {
let mut payloads: Vec<serde_json::Value> =
staged.iter().map(|(_, json, _)| json.clone()).collect();
let mut failed_indices: std::collections::HashSet<usize> =
std::collections::HashSet::new();

for fk in &fk_mappings {
match crate::infra::sync::fk_mapper::convert_fks_to_uuids_batch(
&mut payloads,
fk,
db,
)
.await
{
tracing::warn!(
error = %e,
uuid = %uuid,
fk_field = fk.local_field,
"Failed to convert FK to UUID, skipping entry"
);
continue;
Ok(failed) => failed_indices.extend(failed),
Err(e) => {
tracing::warn!(
error = %e,
fk_field = fk.local_field,
"Batch FK conversion failed for entries"
);
return Err(sea_orm::DbErr::Custom(format!(
"Entry FK batch conversion failed: {}",
e
)));
}
}
}

// Use indexed_at for checkpoint/watermark tracking, fallback to modified_at if NULL
let timestamp = entry.indexed_at.unwrap_or(entry.modified_at);
sync_results.push((uuid, json, timestamp));
staged = staged
.into_iter()
.zip(payloads.into_iter())
.enumerate()
.filter_map(|(idx, ((uuid, _, ts), resolved))| {
if failed_indices.contains(&idx) {
tracing::warn!(
uuid = %uuid,
"Dropping entry with unresolved FK from sync batch"
);
None
} else {
Some((uuid, resolved, ts))
}
})
.collect();
Comment thread
jamiepine marked this conversation as resolved.
}

Ok(sync_results)
Ok(staged)
}

/// Apply state change - already implemented in Model impl block below
Expand Down Expand Up @@ -564,10 +613,12 @@ impl Model {
inserted.id
};

// Rebuild entry_closure for this synced entry
// Without this, the entry only has a self-reference and cannot be queried
// for descendants, breaking subtree operations, location scoping, etc.
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
// Rebuild entry_closure for this synced entry, unless we're inside a
// backfill apply loop — in that case the post_backfill_rebuild hook
// does a single bulk rebuild at the end, so per-entry work is wasted.
if !crate::infra::sync::is_in_backfill() {
Self::rebuild_entry_closure(entry_id, parent_id, db).await?;
}

// If this is a directory, create or update its entry in the directory_paths table
if EntryKind::from(kind) == EntryKind::Directory {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//! Add composite index on entries(indexed_at, uuid) to back the device-owned
//! sync cursor.
//!
//! `Entry::query_for_sync` paginates by `ORDER BY indexed_at ASC, uuid ASC`
//! with a tie-breaker filter of the same shape. Without this index, SQLite
//! does a full table scan per batch request — O(N) per batch, O(N^2) across
//! an initial backfill of a large library.

use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"CREATE INDEX IF NOT EXISTS idx_entries_indexed_at_uuid \
ON entries(indexed_at, uuid)",
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("DROP INDEX IF EXISTS idx_entries_indexed_at_uuid")
.await?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions core/src/infra/db/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod m20260104_000001_replace_device_id_with_volume_id;
mod m20260105_000001_add_volume_id_to_locations;
mod m20260114_000001_fix_search_index_include_directories;
mod m20260123_000001_remove_legacy_sync_columns;
mod m20260417_000001_add_entries_sync_cursor_index;

pub struct Migrator;

Expand Down Expand Up @@ -81,6 +82,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260105_000001_add_volume_id_to_locations::Migration),
Box::new(m20260114_000001_fix_search_index_include_directories::Migration),
Box::new(m20260123_000001_remove_legacy_sync_columns::Migration),
Box::new(m20260417_000001_add_entries_sync_cursor_index::Migration),
]
}
}
26 changes: 26 additions & 0 deletions core/src/infra/sync/backfill_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! Task-local flag identifying code running inside a backfill apply loop.
//!
//! A few per-record hooks (closure table rebuild, resource event emission)
//! are redundant during backfill because the coordinator does bulk work at
//! the end. Models check this flag to skip that per-record work.

tokio::task_local! {
static IN_BACKFILL: ();
}

/// Run `fut` with the in-backfill flag set. Nested scopes are allowed.
pub async fn in_backfill<F, T>(fut: F) -> T
where
F: std::future::Future<Output = T>,
{
if is_in_backfill() {
fut.await
} else {
IN_BACKFILL.scope((), fut).await
}
}

/// True when the current task is inside an `in_backfill` scope.
pub fn is_in_backfill() -> bool {
IN_BACKFILL.try_with(|_| ()).is_ok()
}
Loading
Loading