Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,17 @@ impl PlanContext {
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
let manifest_files = manifest_list.entries().iter();
let mut manifest_files = manifest_list.entries().iter().collect::<Vec<_>>();
// Sort manifest files to process delete manifests first.
// This avoids a deadlock where the producer blocks on sending data manifest entries
// (because the data channel is full) while the delete manifest consumer is waiting
// for delete manifest entries (which haven't been produced yet).
// By processing delete manifests first, we ensure the delete consumer can finish,
// which then allows the data consumer to start draining the data channel.
manifest_files.sort_by_key(|m| match m.content {
ManifestContentType::Deletes => 0,
ManifestContentType::Data => 1,
});

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
Expand Down
125 changes: 125 additions & 0 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,97 @@ pub mod tests {
writer.close().unwrap();
}
}

pub async fn setup_deadlock_manifests(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let _parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();

// 1. Write DATA manifest with MULTIPLE entries to fill buffer
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_data();

// Add 10 data entries
for i in 0..10 {
writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/{}.parquet", &self.table_location, i))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
.build(),
)
.unwrap();
}
let data_manifest = writer.write_manifest_file().await.unwrap();

// 2. Write DELETE manifest
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_deletes();

writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::PositionDeletes)
.file_path(format!("{}/del.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let delete_manifest = writer.write_manifest_file().await.unwrap();

// Write to manifest list - DATA FIRST then DELETE
// This order is crucial for reproduction
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
}
}

#[test]
Expand Down Expand Up @@ -2127,4 +2218,38 @@ pub mod tests {
"_file column (duplicate) should use RunEndEncoded type"
);
}

#[tokio::test]
async fn test_scan_deadlock() {
let mut fixture = TableTestFixture::new();
fixture.setup_deadlock_manifests().await;

// Create table scan with concurrency limit 1
// This sets channel size to 1.
// Data manifest has 10 entries -> will block producer.
// Delete manifest is 2nd in list -> won't be processed.
// Consumer 2 (Data) not started -> blocked.
// Consumer 1 (Delete) waiting -> blocked.
let table_scan = fixture
.table
.scan()
.with_concurrency_limit(1)
.build()
.unwrap();

// This should timeout/hang if deadlock exists
// We can use tokio::time::timeout
let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
table_scan
.plan_files()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
})
.await;

// Assert it finished (didn't timeout)
assert!(result.is_ok(), "Scan timed out - deadlock detected");
}
}
Loading