diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index fe3f5c8f7e..f28b6b0901 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -194,7 +194,17 @@ impl PlanContext { delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, ) -> Result> + 'static>> { - let manifest_files = manifest_list.entries().iter(); + let mut manifest_files = manifest_list.entries().iter().collect::>(); + // Sort manifest files to process delete manifests first. + // This avoids a deadlock where the producer blocks on sending data manifest entries + // (because the data channel is full) while the delete manifest consumer is waiting + // for delete manifest entries (which haven't been produced yet). + // By processing delete manifests first, we ensure the delete consumer can finish, + // which then allows the data consumer to start draining the data channel. + manifest_files.sort_by_key(|m| match m.content { + ManifestContentType::Deletes => 0, + ManifestContentType::Data => 1, + }); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index d83da8a879..1f7fa50df8 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1170,6 +1170,97 @@ pub mod tests { writer.close().unwrap(); } } + + pub async fn setup_deadlock_manifests(&mut self) { + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let _parent_snapshot = current_snapshot + .parent_snapshot(self.table.metadata()) + .unwrap(); + let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec(); + + // 1. Write DATA manifest with MULTIPLE entries to fill buffer + let mut writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(current_snapshot.snapshot_id()), + None, + current_schema.clone(), + current_partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Add 10 data entries + for i in 0..10 { + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(format!("{}/{}.parquet", &self.table_location, i)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + let data_manifest = writer.write_manifest_file().await.unwrap(); + + // 2. Write DELETE manifest + let mut writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(current_snapshot.snapshot_id()), + None, + current_schema.clone(), + current_partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(format!("{}/del.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + let delete_manifest = writer.write_manifest_file().await.unwrap(); + + // Write to manifest list - DATA FIRST then DELETE + // This order is crucial for reproduction + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot.parent_snapshot_id(), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(vec![data_manifest, delete_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } } #[test] @@ -2127,4 +2218,38 @@ pub mod tests { "_file column (duplicate) should use RunEndEncoded type" ); } + + #[tokio::test] + async fn test_scan_deadlock() { + let mut fixture = TableTestFixture::new(); + fixture.setup_deadlock_manifests().await; + + // Create table scan with concurrency limit 1 + // This sets channel size to 1. + // Data manifest has 10 entries -> will block producer. + // Delete manifest is 2nd in list -> won't be processed. + // Consumer 2 (Data) not started -> blocked. + // Consumer 1 (Delete) waiting -> blocked. + let table_scan = fixture + .table + .scan() + .with_concurrency_limit(1) + .build() + .unwrap(); + + // This should timeout/hang if deadlock exists + // We can use tokio::time::timeout + let result = tokio::time::timeout(std::time::Duration::from_secs(5), async { + table_scan + .plan_files() + .await + .unwrap() + .try_collect::>() + .await + }) + .await; + + // Assert it finished (didn't timeout) + assert!(result.is_ok(), "Scan timed out - deadlock detected"); + } }