File tree Expand file tree Collapse file tree 1 file changed +3
-16
lines changed
datafusion/physical-plan/src/repartition Expand file tree Collapse file tree 1 file changed +3
-16
lines changed Original file line number Diff line number Diff line change @@ -656,22 +656,9 @@ impl OnDemandRepartitionExec {
656
656
} ) ?;
657
657
658
658
// Fetch the batch from the buffer, ideally this should reduce the time gap between the requester and the input stream
659
- let batch_opt = loop {
660
- match buffer_rx. try_recv ( ) {
661
- Ok ( batch) => break Some ( batch) ,
662
- Err ( tokio:: sync:: mpsc:: error:: TryRecvError :: Empty ) => {
663
- tokio:: task:: yield_now ( ) . await ;
664
- }
665
- Err ( tokio:: sync:: mpsc:: error:: TryRecvError :: Disconnected ) => {
666
- break None
667
- }
668
- }
669
- } ;
670
-
671
- let batch = if let Some ( batch) = batch_opt {
672
- batch
673
- } else {
674
- break ;
659
+ let batch = match buffer_rx. recv ( ) . await {
660
+ Some ( batch) => batch,
661
+ None => break ,
675
662
} ;
676
663
677
664
let size = batch. get_array_memory_size ( ) ;
You can’t perform that action at this time.
0 commit comments