@@ -34,11 +34,14 @@ use crate::datasets::data_downloads::download_data;
3434use crate :: datasets:: data_downloads:: download_many;
3535use crate :: utils:: file:: temp_download_filepath;
3636
37- /// Benchmark and local data directory name for ClickBench sorted by event date/ time.
37+ /// Benchmark and local data directory name for ClickBench sorted by event time.
3838pub const CLICKBENCH_SORTED_NAME : & str = "clickbench-sorted" ;
3939const CLICKBENCH_PARTITIONED_NAME : & str = "clickbench_partitioned" ;
4040const SORTED_SHARD_COUNT : usize = 100 ;
4141const SORTED_SHARD_COUNT_U64 : u64 = 100 ;
42+ const SORTED_SHARD_FILENAME_WIDTH : usize = 3 ;
43+ const SORTED_SHARD_PERMUTATION_MULTIPLIER : u64 = 37 ;
44+ const SORTED_SHARD_PERMUTATION_OFFSET : u64 = 17 ;
4245
4346/// Zero-based ClickBench query IDs that filter by or order/group on `EventDate`/`EventTime`.
4447pub const CLICKBENCH_SORTED_QUERY_IDS : & [ usize ] = & [ 23 , 24 , 26 , 36 , 37 , 38 , 39 , 40 , 41 , 42 ] ;
@@ -235,13 +238,18 @@ impl Flavor {
235238 }
236239}
237240
238- /// Generate globally sorted ClickBench Parquet shards under `basepath`.
241+ /// Generate sorted ClickBench Parquet shards under `basepath`.
242+ ///
243+ /// Each shard contains a contiguous `EventTime` range, but the shard filenames are deterministically
244+ /// shuffled. That makes ordinary file listing order bad for ascending TopK queries, so file sort
245+ /// pushdown has a visible job to do.
239246pub async fn generate_sorted_clickbench ( basepath : impl AsRef < Path > ) -> anyhow:: Result < ( ) > {
247+ let basepath = basepath. as_ref ( ) ;
240248 let source_base = CLICKBENCH_PARTITIONED_NAME . to_data_path ( ) ;
241249 Flavor :: Partitioned . download ( & source_base) . await ?;
242250
243251 let source_parquet_dir = source_base. join ( Format :: Parquet . name ( ) ) ;
244- let output_parquet_dir = basepath. as_ref ( ) . join ( Format :: Parquet . name ( ) ) ;
252+ let output_parquet_dir = basepath. join ( Format :: Parquet . name ( ) ) ;
245253
246254 if output_parquet_dir. exists ( ) {
247255 info ! (
@@ -374,7 +382,7 @@ PRAGMA temp_directory={temp_dir};
374382CREATE TABLE hits_sorted AS
375383 SELECT *
376384 FROM read_parquet({source_glob})
377- ORDER BY \" EventDate \" , \" EventTime\" , \" WatchID \" ;
385+ ORDER BY \" EventTime\" ;
378386" ,
379387 temp_dir = sql_string_literal( & duckdb_temp_dir. display( ) . to_string( ) ) ,
380388 source_glob = sql_string_literal( & source_glob. display( ) . to_string( ) ) ,
@@ -383,7 +391,7 @@ CREATE TABLE hits_sorted AS
383391 for shard_idx in 0 ..SORTED_SHARD_COUNT_U64 {
384392 let start = shard_idx * rows_per_shard;
385393 let end = ( start + rows_per_shard) . min ( source_rows) ;
386- let output_path = output_parquet_dir. join ( format ! ( "hits_{ shard_idx}.parquet" ) ) ;
394+ let output_path = output_parquet_dir. join ( sorted_shard_file_name ( shard_idx) ) ;
387395 script. push_str ( & format ! (
388396 "\
389397 COPY (
@@ -400,6 +408,21 @@ COPY (
400408 script
401409}
402410
411+ fn sorted_shard_file_name ( sorted_shard_idx : u64 ) -> String {
412+ debug_assert ! ( sorted_shard_idx < SORTED_SHARD_COUNT_U64 ) ;
413+ let listing_shard_idx = sorted_shard_listing_idx ( sorted_shard_idx) ;
414+ format ! (
415+ "hits_{listing_shard_idx:0width$}.parquet" ,
416+ width = SORTED_SHARD_FILENAME_WIDTH
417+ )
418+ }
419+
420+ fn sorted_shard_listing_idx ( sorted_shard_idx : u64 ) -> u64 {
421+ debug_assert ! ( sorted_shard_idx < SORTED_SHARD_COUNT_U64 ) ;
422+ ( sorted_shard_idx * SORTED_SHARD_PERMUTATION_MULTIPLIER + SORTED_SHARD_PERMUTATION_OFFSET )
423+ % SORTED_SHARD_COUNT_U64
424+ }
425+
403426fn parquet_dir_row_count ( parquet_dir : & Path ) -> anyhow:: Result < u64 > {
404427 let files = parquet_files ( parquet_dir) ?;
405428 anyhow:: ensure!(
@@ -450,3 +473,58 @@ fn sql_string_literal(value: &str) -> String {
450473fn quote_identifier ( value : & str ) -> String {
451474 format ! ( "\" {}\" " , value. replace( '"' , "\" \" " ) )
452475}
476+
477+ #[ cfg( test) ]
478+ mod tests {
479+ use vortex:: utils:: aliases:: hash_set:: HashSet ;
480+
481+ use super :: * ;
482+
483+ #[ test]
484+ fn sorted_clickbench_shard_names_use_deterministic_shuffle ( ) {
485+ let names = ( 0 ..SORTED_SHARD_COUNT_U64 )
486+ . map ( sorted_shard_file_name)
487+ . collect :: < Vec < _ > > ( ) ;
488+
489+ assert_eq ! ( names[ 0 ] , "hits_017.parquet" ) ;
490+ assert_eq ! ( names[ 1 ] , "hits_054.parquet" ) ;
491+ assert_eq ! ( names[ 2 ] , "hits_091.parquet" ) ;
492+ assert_eq ! ( names[ 99 ] , "hits_080.parquet" ) ;
493+
494+ let unique_names = names. iter ( ) . collect :: < HashSet < _ > > ( ) ;
495+ assert_eq ! ( unique_names. len( ) , SORTED_SHARD_COUNT ) ;
496+
497+ let mut sorted_names = names. clone ( ) ;
498+ sorted_names. sort ( ) ;
499+ assert_ne ! ( names, sorted_names) ;
500+
501+ let mut reverse_names = sorted_names;
502+ reverse_names. reverse ( ) ;
503+ assert_ne ! ( names, reverse_names) ;
504+ }
505+
506+ #[ test]
507+ fn sorted_clickbench_script_sorts_for_topk_and_writes_shuffled_shards ( ) {
508+ let script = sorted_clickbench_duckdb_script (
509+ Path :: new ( "source" ) ,
510+ Path :: new ( "out" ) ,
511+ Path :: new ( "tmp" ) ,
512+ 1000 ,
513+ ) ;
514+
515+ assert ! ( script. contains( "ORDER BY \" EventTime\" ;" ) ) ;
516+
517+ let first = script
518+ . find ( "hits_017.parquet" )
519+ . expect ( "first sorted shard should use its shuffled shard name" ) ;
520+ let second = script
521+ . find ( "hits_054.parquet" )
522+ . expect ( "second sorted shard should use its shuffled shard name" ) ;
523+ let third = script
524+ . find ( "hits_091.parquet" )
525+ . expect ( "third sorted shard should use its shuffled shard name" ) ;
526+
527+ assert ! ( first < second) ;
528+ assert ! ( second < third) ;
529+ }
530+ }
0 commit comments