Skip to content

Commit c28c80e

Browse files
authored
fix(s3): parquet file read positioning and text column type conversion (#485)
* fix(s3): parquet file reading positioning and text column type conversion * fix(s3): parquet file reading positioning and text column type conversion
1 parent 8e7ebaa commit c28c80e

File tree

5 files changed

+67
-10
lines changed

5 files changed

+67
-10
lines changed
Binary file not shown.

wrappers/src/fdw/s3_fdw/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This is a foreign data wrapper for [AWS S3](https://aws.amazon.com/s3/). It is d
1010

1111
| Version | Date | Notes |
1212
| ------- | ---------- | ---------------------------------------------------- |
13+
| 0.1.5 | 2025-07-25 | Fixed parquet file reading position issue |
1314
| 0.1.4 | 2024-08-20 | Added `path_style_url` server option |
1415
| 0.1.2 | 2023-07-13 | Added fdw stats collection |
1516
| 0.1.1 | 2023-06-05 | Added Parquet file support |

wrappers/src/fdw/s3_fdw/parquet.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,10 @@ impl AsyncRead for S3ParquetReader {
8888
};
8989

9090
// calculate request range
91-
let object_remaining = object_size - self.pos;
91+
let object_remaining = object_size.saturating_sub(self.pos);
9292
let remaining = min(buf.remaining() as u64, object_remaining);
93-
let range = format!("bytes={}-{}", self.pos, self.pos + remaining - 1);
93+
let upper_bound = self.pos.saturating_add(remaining).saturating_sub(1);
94+
let range = format!("bytes={}-{}", self.pos, upper_bound);
9495

9596
// wait on current thread to get object contents
9697
match futures::executor::block_on(
@@ -102,8 +103,20 @@ impl AsyncRead for S3ParquetReader {
102103
.send(),
103104
) {
104105
Ok(output) => {
106+
let pre_filled = buf.filled().len();
105107
let mut rdr = output.body.into_async_read();
106-
AsyncRead::poll_read(Pin::new(&mut rdr), cx, buf)
108+
match AsyncRead::poll_read(Pin::new(&mut rdr), cx, buf) {
109+
Poll::Ready(result) => match result {
110+
Ok(_) => {
111+
// calculate consumed length of bytes and add it to current position
112+
let consumed = buf.filled().len().saturating_sub(pre_filled);
113+
self.pos = self.pos.saturating_add(consumed as u64);
114+
Poll::Ready(Ok(()))
115+
}
116+
Err(err) => Poll::Ready(Err(err)),
117+
},
118+
Poll::Pending => Poll::Pending,
119+
}
107120
}
108121
Err(err) => Poll::Ready(Err(to_io_error(err))),
109122
}
@@ -296,15 +309,23 @@ impl S3Parquet {
296309
}
297310
}
298311
pg_sys::TEXTOID => {
299-
let arr = col
300-
.as_any()
301-
.downcast_ref::<array::BinaryArray>()
302-
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
303-
if arr.is_null(self.batch_idx) {
312+
// 'text' type can be converted from StringArray, StringViewArray and
313+
// generic BinaryArray
314+
if col.is_null(self.batch_idx) {
304315
None
305-
} else {
316+
} else if let Some(arr) = col.as_any().downcast_ref::<array::StringArray>()
317+
{
318+
Some(Cell::String(arr.value(self.batch_idx).to_string()))
319+
} else if let Some(arr) =
320+
col.as_any().downcast_ref::<array::StringViewArray>()
321+
{
322+
Some(Cell::String(arr.value(self.batch_idx).to_string()))
323+
} else if let Some(arr) = col.as_any().downcast_ref::<array::BinaryArray>()
324+
{
306325
let s = String::from_utf8_lossy(arr.value(self.batch_idx));
307326
Some(Cell::String(s.to_string()))
327+
} else {
328+
return Err(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()));
308329
}
309330
}
310331
pg_sys::DATEOID => {

wrappers/src/fdw/s3_fdw/s3_fdw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ enum Parser {
2525
}
2626

2727
#[wrappers_fdw(
28-
version = "0.1.4",
28+
version = "0.1.5",
2929
author = "Supabase",
3030
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/s3_fdw",
3131
error_type = "S3FdwError"

wrappers/src/fdw/s3_fdw/tests.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,29 @@ mod tests {
155155
)
156156
.unwrap();
157157

158+
c.update(
159+
r#"
160+
CREATE FOREIGN TABLE s3_test_table_parquet_titanic (
161+
"PassengerId" bigint,
162+
"Survived" bigint,
163+
"Pclass" bigint,
164+
"Name" text,
165+
"Age" double precision,
166+
"SibSp" bigint,
167+
"Parch" bigint,
168+
"Fare" double precision
169+
)
170+
SERVER s3_server
171+
OPTIONS (
172+
uri 's3://warehouse/test_data_titanic.parquet',
173+
format 'parquet'
174+
)
175+
"#,
176+
None,
177+
&[],
178+
)
179+
.unwrap();
180+
158181
let check_test_table = |table| {
159182
let sql = format!("SELECT * FROM {} ORDER BY name LIMIT 1", table);
160183
let results = c
@@ -191,6 +214,18 @@ mod tests {
191214

192215
check_parquet_table("s3_test_table_parquet");
193216
check_parquet_table("s3_test_table_parquet_gz");
217+
218+
let sql = "SELECT * FROM s3_test_table_parquet_titanic ORDER BY 1 LIMIT 1";
219+
let results = c
220+
.select(sql, None, &[])
221+
.unwrap()
222+
.filter_map(|r| {
223+
r.get_by_name::<i64, _>("PassengerId")
224+
.unwrap()
225+
.zip(r.get_by_name::<&str, _>("Name").unwrap())
226+
})
227+
.collect::<Vec<_>>();
228+
assert_eq!(results, vec![(1, "Braund, Mr. Owen Harris")]);
194229
});
195230
}
196231
}

0 commit comments

Comments
 (0)