Skip to content

Commit 9e84fcc

Browse files
authored
Attempt to make csv decoding faster (#30305)
Attempt to make CSV decoding faster by avoiding ephemeral allocations and copying where possible. <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation Copying about 760MiB of data into Materialize: Before: ``` materialize=> \COPY tuple FROM 'data.csv' WITH (FORMAT csv, HEADER true,NULL '\N'); COPY 11010101 Time: 335687.917 ms (05:35.688) ``` After: ``` materialize=> \COPY tuple FROM 'data.csv' WITH (FORMAT csv, HEADER true,NULL '\N'); COPY 11010101 Time: 317502.991 ms (05:17.503) ``` Well, not a substantial improvement! <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c04d15a commit 9e84fcc

File tree

13 files changed

+221
-56
lines changed

13 files changed

+221
-56
lines changed

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ impl CatalogState {
11531153
let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
11541154
let mut row = Row::default();
11551155
row.packer()
1156-
.push_array(
1156+
.try_push_array(
11571157
&[ArrayDimension {
11581158
lower_bound: 1,
11591159
length: kafka.brokers.len(),
@@ -1757,7 +1757,7 @@ impl CatalogState {
17571757

17581758
let mut row = Row::default();
17591759
row.packer()
1760-
.push_array(
1760+
.try_push_array(
17611761
&[ArrayDimension {
17621762
lower_bound: 1,
17631763
length: arg_type_ids.len(),
@@ -1825,7 +1825,7 @@ impl CatalogState {
18251825

18261826
let mut row = Row::default();
18271827
row.packer()
1828-
.push_array(
1828+
.try_push_array(
18291829
&[ArrayDimension {
18301830
lower_bound: 1,
18311831
length: arg_type_ids.len(),
@@ -2136,7 +2136,7 @@ impl CatalogState {
21362136
let mut row = Row::default();
21372137
let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
21382138
row.packer()
2139-
.push_array(
2139+
.try_push_array(
21402140
&[ArrayDimension {
21412141
lower_bound: 1,
21422142
length: flat_privileges.len(),
@@ -2254,7 +2254,7 @@ impl CatalogState {
22542254
]);
22552255
if reference.columns.len() > 0 {
22562256
packer
2257-
.push_array(
2257+
.try_push_array(
22582258
&[ArrayDimension {
22592259
lower_bound: 1,
22602260
length: reference.columns.len(),

src/adapter/src/coord/statement_logging.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ impl Coordinator {
467467
},
468468
]);
469469
packer
470-
.push_array(
470+
.try_push_array(
471471
&[ArrayDimension {
472472
lower_bound: 1,
473473
length: params.len(),

src/adapter/src/optimize/dataflows.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ fn eval_unmaterializable_func(
530530
let pack_1d_array = |datums: Vec<Datum>| {
531531
let mut row = Row::default();
532532
row.packer()
533-
.push_array(
533+
.try_push_array(
534534
&[ArrayDimension {
535535
lower_bound: 1,
536536
length: datums.len(),
@@ -640,7 +640,7 @@ fn eval_unmaterializable_func(
640640
row.packer().push_dict_with(|row| {
641641
for (role_id, role_membership) in &role_memberships {
642642
row.push(Datum::from(role_id.as_str()));
643-
row.push_array(
643+
row.try_push_array(
644644
&[ArrayDimension {
645645
lower_bound: 1,
646646
length: role_membership.len(),

src/expr/src/relation/func.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ where
381381
length: datums.len(),
382382
};
383383
temp_storage.make_datum(|packer| {
384-
packer.push_array(&[dims], datums).unwrap();
384+
packer.try_push_array(&[dims], datums).unwrap();
385385
})
386386
}
387387

src/expr/src/scalar/func.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,7 +2115,7 @@ fn parse_ident<'a>(
21152115
}
21162116

21172117
Ok(temp_storage.try_make_datum(|packer| {
2118-
packer.push_array(
2118+
packer.try_push_array(
21192119
&[ArrayDimension {
21202120
lower_bound: 1,
21212121
length: elems.len(),
@@ -2146,7 +2146,7 @@ fn regexp_split_to_array_re<'a>(
21462146
let found = mz_regexp::regexp_split_to_array(text, regexp);
21472147
let mut row = Row::default();
21482148
let mut packer = row.packer();
2149-
packer.push_array(
2149+
packer.try_push_array(
21502150
&[ArrayDimension {
21512151
lower_bound: 1,
21522152
length: found.len(),
@@ -6550,7 +6550,7 @@ fn regexp_match_static<'a>(
65506550
// participate in the match.
65516551
match needle.captures(haystack.unwrap_str()) {
65526552
None => packer.push(Datum::Null),
6553-
Some(captures) => packer.push_array(
6553+
Some(captures) => packer.try_push_array(
65546554
&[ArrayDimension {
65556555
lower_bound: 1,
65566556
length: captures.len() - 1,
@@ -6567,7 +6567,7 @@ fn regexp_match_static<'a>(
65676567
// containing the match, or null if there is no match.
65686568
match needle.find(haystack.unwrap_str()) {
65696569
None => packer.push(Datum::Null),
6570-
Some(mtch) => packer.push_array(
6570+
Some(mtch) => packer.try_push_array(
65716571
&[ArrayDimension {
65726572
lower_bound: 1,
65736573
length: 1,
@@ -6817,7 +6817,7 @@ fn array_create_multidim<'a>(
68176817
if datums.iter().all(|d| d.unwrap_array().dims().is_empty()) {
68186818
let dims = &[];
68196819
let datums = &[];
6820-
let datum = temp_storage.try_make_datum(|packer| packer.push_array(dims, datums))?;
6820+
let datum = temp_storage.try_make_datum(|packer| packer.try_push_array(dims, datums))?;
68216821
return Ok(datum);
68226822
}
68236823

@@ -6831,7 +6831,8 @@ fn array_create_multidim<'a>(
68316831
let elements = datums
68326832
.iter()
68336833
.flat_map(|d| d.unwrap_array().elements().iter());
6834-
let datum = temp_storage.try_make_datum(move |packer| packer.push_array(&dims, elements))?;
6834+
let datum =
6835+
temp_storage.try_make_datum(move |packer| packer.try_push_array(&dims, elements))?;
68356836
Ok(datum)
68366837
}
68376838

@@ -6855,7 +6856,7 @@ fn array_create_scalar<'a>(
68556856
// strangely to satisfy the borrow checker while avoiding an allocation.
68566857
dims = &[];
68576858
}
6858-
let datum = temp_storage.try_make_datum(|packer| packer.push_array(dims, datums))?;
6859+
let datum = temp_storage.try_make_datum(|packer| packer.try_push_array(dims, datums))?;
68596860
Ok(datum)
68606861
}
68616862

@@ -7351,7 +7352,7 @@ fn array_remove<'a>(
73517352
length: elems.len(),
73527353
};
73537354

7354-
Ok(temp_storage.try_make_datum(|packer| packer.push_array(&dims, elems))?)
7355+
Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, elems))?)
73557356
}
73567357

73577358
// TODO(benesch): remove potentially dangerous usage of `as`.
@@ -7535,7 +7536,7 @@ fn array_array_concat<'a>(
75357536

75367537
let elems = a_array.elements().iter().chain(b_array.elements().iter());
75377538

7538-
Ok(temp_storage.try_make_datum(|packer| packer.push_array(&dims, elems))?)
7539+
Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, elems))?)
75397540
}
75407541

75417542
fn list_list_concat<'a>(a: Datum<'a>, b: Datum<'a>, temp_storage: &'a RowArena) -> Datum<'a> {
@@ -7779,8 +7780,9 @@ fn array_fill<'a>(
77797780
.collect()
77807781
};
77817782

7782-
Ok(temp_storage
7783-
.try_make_datum(|packer| packer.push_array(&array_dimensions, vec![fill; fill_count]))?)
7783+
Ok(temp_storage.try_make_datum(|packer| {
7784+
packer.try_push_array(&array_dimensions, vec![fill; fill_count])
7785+
})?)
77847786
}
77857787

77867788
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]

src/expr/src/scalar/func/impls/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ impl LazyUnaryFunc for CastArrayToArray {
272272
.map(|datum| self.cast_expr.eval(&[datum], temp_storage))
273273
.collect::<Result<Vec<Datum<'a>>, EvalError>>()?;
274274

275-
Ok(temp_storage.try_make_datum(|packer| packer.push_array(&dims, casted_datums))?)
275+
Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, casted_datums))?)
276276
}
277277

278278
fn output_type(&self, _input_type: ColumnType) -> ColumnType {

src/expr/src/scalar/func/impls/string.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ impl LazyUnaryFunc for CastStringToArray {
333333
},
334334
)?;
335335

336-
Ok(temp_storage.try_make_datum(|packer| packer.push_array(&dims, datums))?)
336+
Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, datums))?)
337337
}
338338

339339
/// The output ColumnType of this function

src/pgcopy/src/copy.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use std::io;
1313
use bytes::BytesMut;
1414
use csv::{ByteRecord, ReaderBuilder};
1515
use mz_proto::{ProtoType, RustType, TryFromProtoError};
16-
use mz_repr::{ColumnType, Datum, RelationDesc, RelationType, Row, RowArena, RowRef, ScalarType};
16+
use mz_repr::{
17+
ColumnType, Datum, RelationDesc, RelationType, Row, RowArena, RowRef, ScalarType, SharedRow,
18+
};
1719
use proptest::prelude::{any, Arbitrary, Just};
1820
use proptest::strategy::{BoxedStrategy, Strategy, Union};
1921
use serde::Deserialize;
@@ -785,6 +787,7 @@ pub fn decode_copy_format_csv(
785787

786788
let mut record = ByteRecord::new();
787789

790+
let buf = RowArena::new();
788791
while rdr.read_byte_record(&mut record)? {
789792
if record.len() == 1 && record.iter().next() == Some(END_OF_COPY_MARKER) {
790793
break;
@@ -802,23 +805,32 @@ pub fn decode_copy_format_csv(
802805
std::cmp::Ordering::Equal => Ok(()),
803806
}?;
804807

805-
let mut row = Vec::new();
806-
let buf = RowArena::new();
808+
let binding = SharedRow::get();
809+
let mut row_builder = binding.borrow_mut();
810+
let mut row_packer = row_builder.packer();
811+
buf.clear();
807812

808813
for (typ, raw_value) in column_types.iter().zip(record.iter()) {
809814
if raw_value == null_as_bytes {
810-
row.push(Datum::Null);
815+
row_packer.push(Datum::Null);
811816
} else {
812-
match mz_pgrepr::Value::decode_text(typ, raw_value) {
813-
Ok(value) => row.push(value.into_datum(&buf, typ)),
817+
let s = match std::str::from_utf8(raw_value) {
818+
Ok(s) => s,
819+
Err(err) => {
820+
let msg = format!("invalid utf8 data in column: {}", err);
821+
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
822+
}
823+
};
824+
match mz_pgrepr::Value::decode_text_into_row(typ, s, &mut row_packer) {
825+
Ok(()) => {}
814826
Err(err) => {
815827
let msg = format!("unable to decode column: {}", err);
816828
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
817829
}
818830
}
819831
}
820832
}
821-
rows.push(Row::pack(row));
833+
rows.push(row_builder.clone());
822834
}
823835

824836
Ok(rows)

src/pgrepr/src/value.rs

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
2424
use mz_repr::adt::range::{Range, RangeInner};
2525
use mz_repr::adt::timestamp::CheckedTimestamp;
2626
use mz_repr::strconv::{self, Nestable};
27-
use mz_repr::{Datum, RelationType, RowArena, RowRef, ScalarType};
27+
use mz_repr::{Datum, RelationType, RowArena, RowPacker, RowRef, ScalarType};
2828
use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
2929
use uuid::Uuid;
3030

@@ -227,7 +227,7 @@ impl Value {
227227
};
228228
buf.make_datum(|packer| {
229229
packer
230-
.push_array(
230+
.try_push_array(
231231
&dims,
232232
elements.into_iter().map(|element| match element {
233233
Some(element) => element.into_datum(buf, element_pg_type),
@@ -678,6 +678,120 @@ impl Value {
678678
})
679679
}
680680

681+
/// Deserializes a value of type `ty` from `s` using the [text encoding format](Format::Text).
682+
pub fn decode_text_into_row<'a>(
683+
ty: &'a Type,
684+
s: &'a str,
685+
packer: &mut RowPacker,
686+
) -> Result<(), Box<dyn Error + Sync + Send>> {
687+
Ok(match ty {
688+
Type::Array(elem_type) => {
689+
let (elements, dims) =
690+
strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
691+
// SAFETY: The function returns the number of times it called `push` on the packer.
692+
unsafe {
693+
packer.push_array_with_unchecked(&dims, |packer| {
694+
let mut nelements = 0;
695+
for element in elements {
696+
match element {
697+
Some(elem_text) => {
698+
Value::decode_text_into_row(elem_type, &elem_text, packer)?
699+
}
700+
701+
None => packer.push(Datum::Null),
702+
}
703+
nelements += 1;
704+
}
705+
Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
706+
})?
707+
}
708+
}
709+
Type::Int2Vector { .. } => {
710+
return Err("input of Int2Vector types is not implemented".into())
711+
}
712+
Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
713+
Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
714+
Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
715+
Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
716+
Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
717+
Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
718+
Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
719+
Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
720+
Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
721+
Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
722+
Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
723+
Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
724+
Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
725+
Type::Json => return Err("input of json types is not implemented".into()),
726+
Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
727+
Type::List(elem_type) => {
728+
let elems = strconv::parse_list(
729+
s,
730+
matches!(**elem_type, Type::List(..)),
731+
|| None,
732+
|elem_text| Ok::<_, String>(Some(elem_text)),
733+
)?;
734+
packer.push_list_with(|packer| {
735+
for elem in elems {
736+
match elem {
737+
Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
738+
None => packer.push(Datum::Null),
739+
}
740+
}
741+
Ok::<_, Box<dyn Error + Sync + Send>>(())
742+
})?;
743+
}
744+
Type::Map { value_type } => {
745+
let map =
746+
strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
747+
elem_text.map(Ok::<_, String>).transpose()
748+
})?;
749+
packer.push_dict_with(|row| {
750+
for (k, v) in map {
751+
row.push(Datum::String(&k));
752+
match v {
753+
Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
754+
None => row.push(Datum::Null),
755+
}
756+
}
757+
Ok::<_, Box<dyn Error + Sync + Send>>(())
758+
})?;
759+
}
760+
Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
761+
Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
762+
Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
763+
packer.push(Datum::UInt32(strconv::parse_oid(s)?))
764+
}
765+
Type::Record(_) => {
766+
return Err("input of anonymous composite types is not implemented".into())
767+
}
768+
Type::Text => packer.push(Datum::String(s)),
769+
Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
770+
Type::VarChar { .. } => packer.push(Datum::String(s)),
771+
Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
772+
Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
773+
Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
774+
Type::TimestampTz { .. } => {
775+
packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
776+
}
777+
Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
778+
Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
779+
Type::Range { element_type } => {
780+
let range = strconv::parse_range(s, |elem_text| {
781+
Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
782+
})?;
783+
// TODO: We should be able to push ranges without scratch space, but that requires
784+
// a different `push_range` API.
785+
let buf = RowArena::new();
786+
let range = range.into_bounds(|elem| elem.into_datum(&buf, element_type));
787+
788+
packer.push_range(range).unwrap()
789+
}
790+
Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
791+
Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
792+
})
793+
}
794+
681795
/// Deserializes a value of type `ty` from `raw` using the [binary encoding
682796
/// format](Format::Binary).
683797
pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {

0 commit comments

Comments
 (0)