Skip to content

Commit 4084894

Browse files
authored
fix: create file for empty stream (#16342)
* fix: create csv for empty stream * clippy * fmt * update test
1 parent 4c3b847 commit 4084894

File tree

4 files changed

+198
-7
lines changed

4 files changed

+198
-7
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod tests {
3333
use arrow_schema::{DataType, Field, Schema, SchemaRef};
3434
use datafusion_catalog::Session;
3535
use datafusion_common::cast::as_string_array;
36+
use datafusion_common::config::CsvOptions;
3637
use datafusion_common::internal_err;
3738
use datafusion_common::stats::Precision;
3839
use datafusion_common::test_util::{arrow_test_data, batches_to_string};
@@ -795,6 +796,62 @@ mod tests {
795796
Ok(())
796797
}
797798

799+
#[tokio::test]
800+
async fn test_csv_write_empty_file() -> Result<()> {
801+
// Case 1. write to a single file
802+
// Expect: an empty file created
803+
let tmp_dir = tempfile::TempDir::new().unwrap();
804+
let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy());
805+
806+
let ctx = SessionContext::new();
807+
808+
let df = ctx.sql("SELECT 1 limit 0").await?;
809+
810+
let cfg1 =
811+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
812+
let cfg2 = CsvOptions::default().with_has_header(true);
813+
814+
df.write_csv(&path, cfg1, Some(cfg2)).await?;
815+
assert!(std::path::Path::new(&path).exists());
816+
817+
// Case 2. write to a directory without partition columns
818+
// Expect: under the directory, an empty file is created
819+
let tmp_dir = tempfile::TempDir::new().unwrap();
820+
let path = format!("{}", tmp_dir.path().to_string_lossy());
821+
822+
let cfg1 =
823+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
824+
let cfg2 = CsvOptions::default().with_has_header(true);
825+
826+
let df = ctx.sql("SELECT 1 limit 0").await?;
827+
828+
df.write_csv(&path, cfg1, Some(cfg2)).await?;
829+
assert!(std::path::Path::new(&path).exists());
830+
831+
let files = std::fs::read_dir(&path).unwrap();
832+
assert!(files.count() == 1);
833+
834+
// Case 3. write to a directory with partition columns
835+
// Expect: No file is created
836+
let tmp_dir = tempfile::TempDir::new().unwrap();
837+
let path = format!("{}", tmp_dir.path().to_string_lossy());
838+
839+
let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
840+
841+
let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
842+
.with_single_file_output(true)
843+
.with_partition_by(vec!["col1".to_string()]);
844+
let cfg2 = CsvOptions::default().with_has_header(true);
845+
846+
df.write_csv(&path, cfg1, Some(cfg2)).await?;
847+
848+
assert!(std::path::Path::new(&path).exists());
849+
let files = std::fs::read_dir(&path).unwrap();
850+
assert!(files.count() == 0);
851+
852+
Ok(())
853+
}
854+
798855
/// Read a single empty csv file with header
799856
///
800857
/// empty.csv:

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mod tests {
3131
use arrow_schema::Schema;
3232
use bytes::Bytes;
3333
use datafusion_catalog::Session;
34+
use datafusion_common::config::JsonOptions;
3435
use datafusion_common::test_util::batches_to_string;
3536
use datafusion_datasource::decoder::{
3637
BatchDeserializer, DecoderDeserializer, DeserializerOutput,
@@ -257,6 +258,61 @@ mod tests {
257258
Ok(())
258259
}
259260

261+
#[tokio::test]
262+
async fn test_json_write_empty_file() -> Result<()> {
263+
// Case 1. write to a single file
264+
// Expect: an empty file created
265+
let tmp_dir = tempfile::TempDir::new().unwrap();
266+
let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy());
267+
268+
let ctx = SessionContext::new();
269+
270+
let df = ctx.sql("SELECT 1 limit 0").await?;
271+
272+
let cfg1 =
273+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
274+
let cfg2 = JsonOptions::default();
275+
276+
df.write_json(&path, cfg1, Some(cfg2)).await?;
277+
assert!(std::path::Path::new(&path).exists());
278+
279+
// Case 2. write to a directory without partition columns
280+
// Expect: under the directory, an empty file is created
281+
let tmp_dir = tempfile::TempDir::new().unwrap();
282+
let path = format!("{}", tmp_dir.path().to_string_lossy());
283+
284+
let cfg1 =
285+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
286+
let cfg2 = JsonOptions::default();
287+
288+
let df = ctx.sql("SELECT 1 limit 0").await?;
289+
290+
df.write_json(&path, cfg1, Some(cfg2)).await?;
291+
assert!(std::path::Path::new(&path).exists());
292+
293+
let files = std::fs::read_dir(&path).unwrap();
294+
assert!(files.count() == 1);
295+
296+
// Case 3. write to a directory with partition columns
297+
// Expect: No file is created
298+
let tmp_dir = tempfile::TempDir::new().unwrap();
299+
let path = format!("{}", tmp_dir.path().to_string_lossy());
300+
301+
let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
302+
303+
let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
304+
.with_single_file_output(true)
305+
.with_partition_by(vec!["col1".to_string()]);
306+
let cfg2 = JsonOptions::default();
307+
308+
df.write_json(&path, cfg1, Some(cfg2)).await?;
309+
310+
assert!(std::path::Path::new(&path).exists());
311+
let files = std::fs::read_dir(&path).unwrap();
312+
assert!(files.count() == 0);
313+
Ok(())
314+
}
315+
260316
#[test]
261317
fn test_json_deserializer_finish() -> Result<()> {
262318
let schema = Arc::new(Schema::new(vec![

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,61 @@ mod tests {
12461246
Ok(())
12471247
}
12481248

1249+
#[tokio::test]
1250+
async fn test_parquet_write_empty_file() -> Result<()> {
1251+
// Case 1. write to a single file
1252+
// Expect: an empty file created
1253+
let tmp_dir = tempfile::TempDir::new().unwrap();
1254+
let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy());
1255+
1256+
let ctx = SessionContext::new();
1257+
1258+
let df = ctx.sql("SELECT 1 limit 0").await?;
1259+
1260+
let cfg1 =
1261+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
1262+
let cfg2 = TableParquetOptions::default();
1263+
1264+
df.write_parquet(&path, cfg1, Some(cfg2)).await?;
1265+
assert!(std::path::Path::new(&path).exists());
1266+
1267+
// Case 2. write to a directory without partition columns
1268+
// Expect: under the directory, an empty file is created
1269+
let tmp_dir = tempfile::TempDir::new().unwrap();
1270+
let path = format!("{}", tmp_dir.path().to_string_lossy());
1271+
1272+
let cfg1 =
1273+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
1274+
let cfg2 = TableParquetOptions::default();
1275+
1276+
let df = ctx.sql("SELECT 1 limit 0").await?;
1277+
1278+
df.write_parquet(&path, cfg1, Some(cfg2)).await?;
1279+
assert!(std::path::Path::new(&path).exists());
1280+
1281+
let files = std::fs::read_dir(&path).unwrap();
1282+
assert!(files.count() == 1);
1283+
1284+
// Case 3. write to a directory with partition columns
1285+
// Expect: No file is created
1286+
let tmp_dir = tempfile::TempDir::new().unwrap();
1287+
let path = format!("{}", tmp_dir.path().to_string_lossy());
1288+
1289+
let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
1290+
1291+
let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
1292+
.with_single_file_output(true)
1293+
.with_partition_by(vec!["col1".to_string()]);
1294+
let cfg2 = TableParquetOptions::default();
1295+
1296+
df.write_parquet(&path, cfg1, Some(cfg2)).await?;
1297+
1298+
assert!(std::path::Path::new(&path).exists());
1299+
let files = std::fs::read_dir(&path).unwrap();
1300+
assert!(files.count() == 0);
1301+
Ok(())
1302+
}
1303+
12491304
#[tokio::test]
12501305
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
12511306
// expected kv metadata without schema

datafusion/datasource/src/file_sink_config.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ use crate::sink::DataSink;
2222
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
2323
use crate::ListingTableUrl;
2424

25+
use arrow::array::RecordBatch;
2526
use arrow::datatypes::{DataType, SchemaRef};
2627
use datafusion_common::Result;
2728
use datafusion_common_runtime::SpawnedTask;
2829
use datafusion_execution::object_store::ObjectStoreUrl;
2930
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3031
use datafusion_expr::dml::InsertOp;
32+
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
3133

3234
use async_trait::async_trait;
3335
use object_store::ObjectStore;
@@ -77,13 +79,34 @@ pub trait FileSink: DataSink {
7779
.runtime_env()
7880
.object_store(&config.object_store_url)?;
7981
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
80-
self.spawn_writer_tasks_and_join(
81-
context,
82-
demux_task,
83-
file_stream_rx,
84-
object_store,
85-
)
86-
.await
82+
let mut num_rows = self
83+
.spawn_writer_tasks_and_join(
84+
context,
85+
demux_task,
86+
file_stream_rx,
87+
Arc::clone(&object_store),
88+
)
89+
.await?;
90+
if num_rows == 0 {
91+
// If no rows were written, then no files are output either.
92+
// In this case, send an empty record batch through to ensure the output file is generated
93+
let schema = Arc::clone(&config.output_schema);
94+
let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
95+
let data = Box::pin(RecordBatchStreamAdapter::new(
96+
schema,
97+
futures::stream::iter(vec![Ok(empty_batch)]),
98+
));
99+
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
100+
num_rows = self
101+
.spawn_writer_tasks_and_join(
102+
context,
103+
demux_task,
104+
file_stream_rx,
105+
Arc::clone(&object_store),
106+
)
107+
.await?;
108+
}
109+
Ok(num_rows)
87110
}
88111
}
89112

0 commit comments

Comments
 (0)