Skip to content

Commit e4a1ade

Browse files
committed
Disable scheam inference when schema evolution is disabled
This will ensure the non-evolution case stands relatively speedy! Sponsored-by: Raft LLC
1 parent 8e77428 commit e4a1ade

File tree

4 files changed

+29
-12
lines changed

4 files changed

+29
-12
lines changed

Cargo.toml

+1-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ serde_json = "1"
2424
strum_macros = "0.20"
2525
thiserror = "1"
2626
tokio = { version = "1", features = ["full"] }
27-
tokio-util = "0.6.3"
27+
tokio-util = "0.7.10"
2828
uuid = { version = "1.0", features = ["serde", "v4"] }
2929
url = "2.3"
3030

@@ -35,8 +35,6 @@ deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main
3535

3636
# s3 feature enabled
3737
dynamodb_lock = { version = "0.6.0", optional = true }
38-
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true }
39-
rusoto_credential = { version = "0.47", optional = true }
4038

4139
# sentry
4240
sentry = { version = "0.23.0", optional = true }
@@ -68,8 +66,6 @@ azure = [
6866
s3 = [
6967
"deltalake-aws",
7068
"dynamodb_lock",
71-
"rusoto_core",
72-
"rusoto_credential",
7369
]
7470

7571
[dev-dependencies]

src/dead_letters.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue {
255255
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE)
256256
.unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()),
257257
};
258-
#[cfg(all(feature = "azure", not(feature="s3")))]
258+
#[cfg(all(feature = "azure", not(feature = "s3")))]
259259
let opts = HashMap::default();
260260

261261
let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?;

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
#![deny(warnings)]
88
#![deny(missing_docs)]
9+
#![allow(unused)]
910

1011
#[macro_use]
1112
extern crate lazy_static;

src/serialization/mod.rs

+26-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use async_trait::async_trait;
2-
use serde_json::Value;
32
use log::*;
3+
use serde_json::Value;
44

55
use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
66

@@ -24,6 +24,13 @@ pub struct DeserializedMessage {
2424
}
2525

2626
impl DeserializedMessage {
27+
fn new(message: Value) -> Self {
28+
Self {
29+
message,
30+
..Default::default()
31+
}
32+
}
33+
2734
pub fn schema(&self) -> &Option<ArrowSchema> {
2835
&self.schema
2936
}
@@ -41,9 +48,7 @@ impl DeserializedMessage {
4148
/// Allow for `.into()` on [Value] for ease of use
4249
impl From<Value> for DeserializedMessage {
4350
fn from(message: Value) -> Self {
44-
// XXX: This seems wasteful, this function should go away, and the deserializers should
45-
// infer straight from the buffer stream
46-
let iter = vec![message.clone()].into_iter().map(Ok);
51+
let iter = std::iter::once(&message).map(Ok);
4752
let schema =
4853
match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) {
4954
Ok(schema) => Some(schema),
@@ -169,7 +174,10 @@ impl MessageDeserializer for DefaultDeserializer {
169174
}
170175
};
171176

172-
Ok(value.into())
177+
match self.can_evolve_schema() {
178+
true => Ok(value.into()),
179+
false => Ok(DeserializedMessage::new(value)),
180+
}
173181
}
174182
}
175183

@@ -183,8 +191,20 @@ mod default_tests {
183191
}
184192

185193
#[tokio::test]
186-
async fn deserialize_with_schema() {
194+
async fn deserializer_default_without_evolution() {
187195
let mut deser = DefaultDeserializer::default();
196+
let dm = deser
197+
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
198+
.await
199+
.unwrap();
200+
assert_eq!(true, dm.schema().is_none());
201+
}
202+
203+
#[tokio::test]
204+
async fn deserialize_with_schema() {
205+
let mut deser = DefaultDeserializer {
206+
schema_evolution: true,
207+
};
188208
let message = deser
189209
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
190210
.await

0 commit comments

Comments
 (0)