Skip to content

Commit 26a7606

Browse files
authored
fix: prioritize delete manifests to prevent scan deadlock (#1937)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? This change ensures that delete manifests are processed before data manifests during the table scan planning phase. Previously, if data manifests were processed first and produced enough entries to fill the channel, the producer would block. Since the delete manifest consumer might still be waiting for its entries (which hadn't been produced yet), this could lead to a deadlock. Prioritizing delete manifests ensures the delete consumer can proceed, allowing the data consumer to eventually drain the channel. ## Are these changes tested? Added a reproduction test case `test_scan_deadlock` to verify the fix.
1 parent 36aedc6 commit 26a7606

File tree

2 files changed

+136
-1
lines changed

2 files changed

+136
-1
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,17 @@ impl PlanContext {
194194
delete_file_idx: DeleteFileIndex,
195195
delete_file_tx: Sender<ManifestEntryContext>,
196196
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
197-
let manifest_files = manifest_list.entries().iter();
197+
let mut manifest_files = manifest_list.entries().iter().collect::<Vec<_>>();
198+
// Sort manifest files to process delete manifests first.
199+
// This avoids a deadlock where the producer blocks on sending data manifest entries
200+
// (because the data channel is full) while the delete manifest consumer is waiting
201+
// for delete manifest entries (which haven't been produced yet).
202+
// By processing delete manifests first, we ensure the delete consumer can finish,
203+
// which then allows the data consumer to start draining the data channel.
204+
manifest_files.sort_by_key(|m| match m.content {
205+
ManifestContentType::Deletes => 0,
206+
ManifestContentType::Data => 1,
207+
});
198208

199209
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
200210
let mut filtered_mfcs = vec![];

crates/iceberg/src/scan/mod.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,6 +1170,97 @@ pub mod tests {
11701170
writer.close().unwrap();
11711171
}
11721172
}
1173+
1174+
pub async fn setup_deadlock_manifests(&mut self) {
1175+
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1176+
let _parent_snapshot = current_snapshot
1177+
.parent_snapshot(self.table.metadata())
1178+
.unwrap();
1179+
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1180+
let current_partition_spec = self.table.metadata().default_partition_spec();
1181+
1182+
// 1. Write DATA manifest with MULTIPLE entries to fill buffer
1183+
let mut writer = ManifestWriterBuilder::new(
1184+
self.next_manifest_file(),
1185+
Some(current_snapshot.snapshot_id()),
1186+
None,
1187+
current_schema.clone(),
1188+
current_partition_spec.as_ref().clone(),
1189+
)
1190+
.build_v2_data();
1191+
1192+
// Add 10 data entries
1193+
for i in 0..10 {
1194+
writer
1195+
.add_entry(
1196+
ManifestEntry::builder()
1197+
.status(ManifestStatus::Added)
1198+
.data_file(
1199+
DataFileBuilder::default()
1200+
.partition_spec_id(0)
1201+
.content(DataContentType::Data)
1202+
.file_path(format!("{}/{}.parquet", &self.table_location, i))
1203+
.file_format(DataFileFormat::Parquet)
1204+
.file_size_in_bytes(100)
1205+
.record_count(1)
1206+
.partition(Struct::from_iter([Some(Literal::long(100))]))
1207+
.key_metadata(None)
1208+
.build()
1209+
.unwrap(),
1210+
)
1211+
.build(),
1212+
)
1213+
.unwrap();
1214+
}
1215+
let data_manifest = writer.write_manifest_file().await.unwrap();
1216+
1217+
// 2. Write DELETE manifest
1218+
let mut writer = ManifestWriterBuilder::new(
1219+
self.next_manifest_file(),
1220+
Some(current_snapshot.snapshot_id()),
1221+
None,
1222+
current_schema.clone(),
1223+
current_partition_spec.as_ref().clone(),
1224+
)
1225+
.build_v2_deletes();
1226+
1227+
writer
1228+
.add_entry(
1229+
ManifestEntry::builder()
1230+
.status(ManifestStatus::Added)
1231+
.data_file(
1232+
DataFileBuilder::default()
1233+
.partition_spec_id(0)
1234+
.content(DataContentType::PositionDeletes)
1235+
.file_path(format!("{}/del.parquet", &self.table_location))
1236+
.file_format(DataFileFormat::Parquet)
1237+
.file_size_in_bytes(100)
1238+
.record_count(1)
1239+
.partition(Struct::from_iter([Some(Literal::long(100))]))
1240+
.build()
1241+
.unwrap(),
1242+
)
1243+
.build(),
1244+
)
1245+
.unwrap();
1246+
let delete_manifest = writer.write_manifest_file().await.unwrap();
1247+
1248+
// Write to manifest list - DATA FIRST then DELETE
1249+
// This order is crucial for reproduction
1250+
let mut manifest_list_write = ManifestListWriter::v2(
1251+
self.table
1252+
.file_io()
1253+
.new_output(current_snapshot.manifest_list())
1254+
.unwrap(),
1255+
current_snapshot.snapshot_id(),
1256+
current_snapshot.parent_snapshot_id(),
1257+
current_snapshot.sequence_number(),
1258+
);
1259+
manifest_list_write
1260+
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
1261+
.unwrap();
1262+
manifest_list_write.close().await.unwrap();
1263+
}
11731264
}
11741265

11751266
#[test]
@@ -2127,4 +2218,38 @@ pub mod tests {
21272218
"_file column (duplicate) should use RunEndEncoded type"
21282219
);
21292220
}
2221+
2222+
#[tokio::test]
2223+
async fn test_scan_deadlock() {
2224+
let mut fixture = TableTestFixture::new();
2225+
fixture.setup_deadlock_manifests().await;
2226+
2227+
// Create table scan with concurrency limit 1
2228+
// This sets channel size to 1.
2229+
// Data manifest has 10 entries -> will block producer.
2230+
// Delete manifest is 2nd in list -> won't be processed.
2231+
// Consumer 2 (Data) not started -> blocked.
2232+
// Consumer 1 (Delete) waiting -> blocked.
2233+
let table_scan = fixture
2234+
.table
2235+
.scan()
2236+
.with_concurrency_limit(1)
2237+
.build()
2238+
.unwrap();
2239+
2240+
// This should timeout/hang if deadlock exists
2241+
// We can use tokio::time::timeout
2242+
let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2243+
table_scan
2244+
.plan_files()
2245+
.await
2246+
.unwrap()
2247+
.try_collect::<Vec<_>>()
2248+
.await
2249+
})
2250+
.await;
2251+
2252+
// Assert it finished (didn't timeout)
2253+
assert!(result.is_ok(), "Scan timed out - deadlock detected");
2254+
}
21302255
}

0 commit comments

Comments
 (0)