Skip to content

Commit c5aaed0

Browse files
feat: streaming response (#1317)
for queries that are expected to return lot of data, we are adding the streaming response mechanism. It uses datafusion's execute_stream function that sends streaming response use query param streaming=true to get the streaming response defaulted to false, for prism or other clients to work as usual query response (with streaming=true) sends multiple batches one after the other split by new line character
1 parent 749a16f commit c5aaed0

File tree

9 files changed

+270
-78
lines changed

9 files changed

+270
-78
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ static-files = "0.2"
121121
thiserror = "2.0"
122122
ulid = { version = "1.0", features = ["serde"] }
123123
xxhash-rust = { version = "0.8", features = ["xxh3"] }
124+
futures-core = "0.3.31"
124125

125126
[build-dependencies]
126127
cargo_toml = "0.21"

src/catalog/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use std::{io::ErrorKind, sync::Arc};
19+
use std::sync::Arc;
2020

2121
use chrono::{DateTime, Local, NaiveTime, Utc};
2222
use column::Column;
@@ -259,10 +259,7 @@ async fn create_manifest(
259259
.date_naive()
260260
.and_time(
261261
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
262-
.ok_or(IOError::new(
263-
ErrorKind::Other,
264-
"Failed to create upper bound for manifest",
265-
))?,
262+
.ok_or(IOError::other("Failed to create upper bound for manifest"))?,
266263
)
267264
.and_utc();
268265

src/handlers/airplane.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl {
126126
}
127127

128128
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
129-
let key = extract_session_key(req.metadata())?;
129+
let key = extract_session_key(req.metadata())
130+
.map_err(|e| Status::unauthenticated(e.to_string()))?;
130131

131-
let ticket = get_query_from_ticket(&req)?;
132+
let ticket =
133+
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;
132134

133135
info!("query requested to airplane: {:?}", ticket);
134136

@@ -217,10 +219,19 @@ impl FlightService for AirServiceImpl {
217219
})?;
218220
let time = Instant::now();
219221

220-
let (records, _) = execute(query, &stream_name)
222+
let (records, _) = execute(query, &stream_name, false)
221223
.await
222224
.map_err(|err| Status::internal(err.to_string()))?;
223225

226+
let records = match records {
227+
actix_web::Either::Left(rbs) => rbs,
228+
actix_web::Either::Right(_) => {
229+
return Err(Status::failed_precondition(
230+
"Expected batch results, got stream",
231+
))
232+
}
233+
};
234+
224235
/*
225236
* INFO: No returning the schema with the data.
226237
* kept it in case it needs to be sent in the future.
@@ -246,7 +257,7 @@ impl FlightService for AirServiceImpl {
246257
.observe(time);
247258

248259
// Airplane takes off 🛫
249-
out
260+
out.map_err(|e| *e)
250261
}
251262

252263
async fn do_put(

0 commit comments

Comments
 (0)