Skip to content

Commit f81849c

Browse files
committed
feat: add continue_on_error option to skip files with invalid headers
Signed-off-by: Alan Tang <[email protected]>
1 parent f89d37f commit f81849c

File tree

4 files changed

+42
-2
lines changed

4 files changed

+42
-2
lines changed

src/common/datasource/src/file_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
5454
pub const FORMAT_HAS_HEADER: &str = "has_header";
5555
pub const FORMAT_HEADER: &str = "header";
5656
pub const FORMAT_SKIP_BAD_RECORDS: &str = "skip_bad_records";
57+
pub const FORMAT_CONTINUE_ON_ERROR: &str = "continue_on_error";
5758
pub const FORMAT_TYPE: &str = "format";
5859
pub const FILE_PATTERN: &str = "pattern";
5960
pub const TIMESTAMP_FORMAT: &str = "timestamp_format";

src/common/datasource/src/file_format/csv.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct CsvFormat {
3838
pub has_header: bool,
3939
pub header: bool,
4040
pub skip_bad_records: bool,
41+
pub continue_on_error: bool,
4142
pub delimiter: u8,
4243
pub schema_infer_max_record: Option<usize>,
4344
pub compression_type: CompressionType,
@@ -103,6 +104,15 @@ impl TryFrom<&HashMap<String, String>> for CsvFormat {
103104
.build()
104105
})?;
105106
}
107+
if let Some(continue_on_error) = value.get(file_format::FORMAT_CONTINUE_ON_ERROR) {
108+
format.continue_on_error = continue_on_error.parse().map_err(|_| {
109+
error::ParseFormatSnafu {
110+
key: file_format::FORMAT_CONTINUE_ON_ERROR,
111+
value: continue_on_error,
112+
}
113+
.build()
114+
})?;
115+
}
106116
if let Some(timestamp_format) = value.get(file_format::TIMESTAMP_FORMAT) {
107117
format.timestamp_format = Some(timestamp_format.clone());
108118
}
@@ -122,6 +132,7 @@ impl Default for CsvFormat {
122132
has_header: true,
123133
header: true,
124134
skip_bad_records: false,
135+
continue_on_error: true,
125136
delimiter: b',',
126137
schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
127138
compression_type: CompressionType::Uncompressed,
@@ -313,6 +324,7 @@ mod tests {
313324
schema_infer_max_record: Some(2000),
314325
delimiter: b'\t',
315326
has_header: false,
327+
continue_on_error: true,
316328
header: true,
317329
skip_bad_records: false,
318330
timestamp_format: None,

src/operator/src/error.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,18 @@ pub enum Error {
595595
location: Location,
596596
},
597597

598+
#[snafu(display(
599+
"Header from CSV file not match, expected table schema: {}, actual file schema: {}",
600+
table_schema,
601+
file_schema
602+
))]
603+
InvalidHeader {
604+
table_schema: String,
605+
file_schema: String,
606+
#[snafu(implicit)]
607+
location: Location,
608+
},
609+
598610
#[snafu(display("Failed to project schema"))]
599611
ProjectSchema {
600612
#[snafu(source)]
@@ -929,6 +941,7 @@ impl ErrorExt for Error {
929941
| Error::InvalidTableName { .. }
930942
| Error::InvalidViewName { .. }
931943
| Error::InvalidFlowName { .. }
944+
| Error::InvalidHeader { .. }
932945
| Error::InvalidView { .. }
933946
| Error::InvalidExpr { .. }
934947
| Error::AdminFunctionNotFound { .. }

src/operator/src/statement/copy_table_from.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use common_base::readable_size::ReadableSize;
2222
use common_datasource::file_format::csv::CsvFormat;
2323
use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
2424
use common_datasource::file_format::{
25-
FORMAT_HAS_HEADER, FORMAT_SKIP_BAD_RECORDS, FORMAT_TYPE, FileFormat, Format,
25+
FORMAT_CONTINUE_ON_ERROR, FORMAT_HAS_HEADER, FORMAT_SKIP_BAD_RECORDS, FORMAT_TYPE, FileFormat,
26+
Format,
2627
};
2728
use common_datasource::lister::{Lister, Source};
2829
use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
@@ -384,6 +385,11 @@ impl StatementExecutor {
384385
.get(FORMAT_SKIP_BAD_RECORDS)
385386
.and_then(|v| v.parse::<bool>().ok())
386387
.unwrap_or(false);
388+
let continue_on_error = req
389+
.with
390+
.get(FORMAT_CONTINUE_ON_ERROR)
391+
.and_then(|v| v.parse::<bool>().ok())
392+
.unwrap_or(false);
387393
let table = self.get_table(&table_ref).await?;
388394
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
389395
let (object_store, entries) = self.list_copy_from_entries(&req).await?;
@@ -412,7 +418,15 @@ impl StatementExecutor {
412418
&& has_header
413419
&& !file_schema.equivalent_names_and_types(&table_schema)
414420
{
415-
continue;
421+
if continue_on_error {
422+
continue;
423+
} else {
424+
return error::InvalidHeaderSnafu {
425+
table_schema: table_schema.to_string(),
426+
file_schema: file_schema.to_string(),
427+
}
428+
.fail();
429+
}
416430
}
417431
let (file_schema_projection, table_schema_projection, compat_schema) =
418432
generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);

0 commit comments

Comments
 (0)