Skip to content

Commit a10f0df

Browse files
committed
feat: add optional parameters for copy table from command
Signed-off-by: Alan Tang <[email protected]>
1 parent 20b5b9b commit a10f0df

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

src/common/datasource/src/file_format.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
5252
pub const FORMAT_DELIMITER: &str = "delimiter";
5353
pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
5454
pub const FORMAT_HAS_HEADER: &str = "has_header";
55+
pub const FORMAT_HEADER: &str = "header";
56+
pub const FORMAT_SKIP_BAD_RECORDS: &str = "skip_bad_records";
5557
pub const FORMAT_TYPE: &str = "format";
5658
pub const FILE_PATTERN: &str = "pattern";
5759
pub const TIMESTAMP_FORMAT: &str = "timestamp_format";

src/common/datasource/src/file_format/csv.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use crate::share_buffer::SharedBuffer;
3636
#[derive(Debug, Clone, PartialEq, Eq)]
3737
pub struct CsvFormat {
3838
pub has_header: bool,
39+
pub header: bool,
40+
pub skip_bad_records: bool,
3941
pub delimiter: u8,
4042
pub schema_infer_max_record: Option<usize>,
4143
pub compression_type: CompressionType,
@@ -83,6 +85,24 @@ impl TryFrom<&HashMap<String, String>> for CsvFormat {
8385
.build()
8486
})?;
8587
};
88+
if let Some(header) = value.get(file_format::FORMAT_HEADER) {
89+
format.header = header.parse().map_err(|_| {
90+
error::ParseFormatSnafu {
91+
key: file_format::FORMAT_HEADER,
92+
value: header,
93+
}
94+
.build()
95+
})?;
96+
}
97+
if let Some(skip_bad_records) = value.get(file_format::FORMAT_SKIP_BAD_RECORDS) {
98+
format.header = skip_bad_records.parse().map_err(|_| {
99+
error::ParseFormatSnafu {
100+
key: file_format::FORMAT_SKIP_BAD_RECORDS,
101+
value: skip_bad_records,
102+
}
103+
.build()
104+
})?;
105+
}
86106
if let Some(timestamp_format) = value.get(file_format::TIMESTAMP_FORMAT) {
87107
format.timestamp_format = Some(timestamp_format.clone());
88108
}
@@ -100,6 +120,8 @@ impl Default for CsvFormat {
100120
fn default() -> Self {
101121
Self {
102122
has_header: true,
123+
header: true,
124+
skip_bad_records: false,
103125
delimiter: b',',
104126
schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
105127
compression_type: CompressionType::Uncompressed,
@@ -291,6 +313,8 @@ mod tests {
291313
schema_infer_max_record: Some(2000),
292314
delimiter: b'\t',
293315
has_header: false,
316+
header: true,
317+
skip_bad_records: false,
294318
timestamp_format: None,
295319
time_format: None,
296320
date_format: None

0 commit comments

Comments
 (0)