Skip to content

Commit

Permalink
Remove avro from the default features and feature gate its cpde
Browse files Browse the repository at this point in the history
This change is a little wrapped up in the introduction of
DeserializedMessage but the trade-off for development targeting S3 is
that I am linking 382 crates every cycle as opposed to 451.

Fixes #163
  • Loading branch information
rtyler committed Jan 9, 2024
1 parent 4ed2c8a commit 72aff33
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 182 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ azure_storage_blobs = { version = "0.18.0", optional = true }

[features]
default = [
"avro",
]
avro = [
"apache-avro",
Expand All @@ -78,6 +77,7 @@ utime = "0.3"
serial_test = "*"
tempfile = "3"
time = "0.3.20"
rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]}

[profile.release]
lto = true
17 changes: 13 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ enum MessageDeserializationError {
EmptyPayload,
#[error("Kafka message deserialization failed")]
JsonDeserialization { dead_letter: DeadLetter },
#[cfg(feature = "avro")]
#[error("Kafka message deserialization failed")]
AvroDeserialization { dead_letter: DeadLetter },
}
Expand Down Expand Up @@ -845,10 +846,18 @@ impl IngestProcessor {
partition, offset
);
}
Err(
MessageDeserializationError::JsonDeserialization { dead_letter }
| MessageDeserializationError::AvroDeserialization { dead_letter },
) => {
Err(MessageDeserializationError::JsonDeserialization { dead_letter }) => {
warn!(
"Deserialization failed - partition {}, offset {}, dead_letter {}",
partition,
offset,
dead_letter.error.as_ref().unwrap_or(&String::from("_")),
);
self.ingest_metrics.message_deserialization_failed();
self.dlq.write_dead_letter(dead_letter).await?;
}
#[cfg(feature = "avro")]
Err(MessageDeserializationError::AvroDeserialization { dead_letter }) => {
warn!(
"Deserialization failed - partition {}, offset {}, dead_letter {}",
partition,
Expand Down
179 changes: 5 additions & 174 deletions src/serialization.rs → src/serialization/avro.rs
Original file line number Diff line number Diff line change
@@ -1,192 +1,23 @@
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, path::PathBuf};

use super::{DeserializedMessage, MessageDeserializationError, MessageDeserializer};
use crate::dead_letters::DeadLetter;
use async_trait::async_trait;
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings,
};
use serde_json::Value;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};

use deltalake_core::arrow::datatypes::Schema as ArrowSchema;

/// Structure which contains the [serde_json::Value] and the inferred schema of the message
///
/// The [ArrowSchema] helps with schema evolution
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeserializedMessage {
message: Value,
schema: Option<ArrowSchema>,
}

impl DeserializedMessage {
pub fn schema(&self) -> &Option<ArrowSchema> {
&self.schema
}
pub fn message(self) -> Value {
self.message
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.message.get(key)
}
pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map<String, Value>> {
self.message.as_object_mut()
}
}

/// Allow for `.into()` on [Value] for ease of use
impl From<Value> for DeserializedMessage {
fn from(message: Value) -> Self {
// XXX: This seems wasteful, this function should go away, and the deserializers should
// infer straight from the buffer stream
let iter = vec![message.clone()].into_iter().map(|v| Ok(v));
let schema =
match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) {
Ok(schema) => Some(schema),
_ => None,
};
Self { message, schema }
}
}

#[async_trait]
pub(crate) trait MessageDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<DeserializedMessage, MessageDeserializationError>;
}

pub(crate) struct MessageDeserializerFactory {}
impl MessageDeserializerFactory {
pub fn try_build(
input_format: &MessageFormat,
) -> Result<Box<dyn MessageDeserializer + Send>, anyhow::Error> {
match input_format {
MessageFormat::Json(data) => match data {
crate::SchemaSource::None => Ok(Self::json_default()),
crate::SchemaSource::SchemaRegistry(sr) => {
match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
}
}
crate::SchemaSource::File(_) => Ok(Self::json_default()),
},
MessageFormat::Avro(data) => match data {
crate::SchemaSource::None => Ok(Box::<AvroSchemaDeserializer>::default()),
crate::SchemaSource::SchemaRegistry(sr) => {
match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
}
}
crate::SchemaSource::File(f) => {
match AvroSchemaDeserializer::try_from_schema_file(f) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
}
}
},
_ => Ok(Box::new(DefaultDeserializer {})),
}
}

fn json_default() -> Box<dyn MessageDeserializer + Send> {
Box::new(DefaultDeserializer {})
}

fn build_sr_settings(registry_url: &url::Url) -> Result<SrSettings, anyhow::Error> {
let mut url_string = registry_url.as_str();
if url_string.ends_with('/') {
url_string = &url_string[0..url_string.len() - 1];
}

let mut builder = SrSettings::new_builder(url_string.to_owned());
if let Ok(username) = std::env::var("SCHEMA_REGISTRY_USERNAME") {
builder.set_basic_authorization(
username.as_str(),
std::option_env!("SCHEMA_REGISTRY_PASSWORD"),
);
}

if let Ok(proxy_url) = std::env::var("SCHEMA_REGISTRY_PROXY") {
builder.set_proxy(proxy_url.as_str());
}

match builder.build() {
Ok(s) => Ok(s),
Err(e) => Err(anyhow::Error::new(e)),
}
}
}

#[derive(Clone, Debug, Default)]
struct DefaultDeserializer {}

#[async_trait]
impl MessageDeserializer for DefaultDeserializer {
async fn deserialize(
&mut self,
payload: &[u8],
) -> Result<DeserializedMessage, MessageDeserializationError> {
let value: Value = match serde_json::from_slice(payload) {
Ok(v) => v,
Err(e) => {
return Err(MessageDeserializationError::JsonDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()),
});
}
};

Ok(value.into())
}
}

#[cfg(test)]
mod default_tests {
use super::*;

#[tokio::test]
async fn deserialize_with_schema() {
let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
assert!(
message.schema().is_some(),
"The DeserializedMessage doesn't have a schema!"
);
}

#[tokio::test]
async fn deserialize_simple_json() {
#[derive(serde::Deserialize)]
struct HW {
hello: String,
}

let mut deser = DefaultDeserializer::default();
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.expect("Failed to deserialize trivial JSON");
let value: HW = serde_json::from_value(message.message).expect("Failed to coerce");
assert_eq!("world", value.hello);
}
}

struct AvroDeserializer {
pub(crate) struct AvroDeserializer {
decoder: EasyAvroDecoder,
}

#[derive(Default)]
struct AvroSchemaDeserializer {
pub(crate) struct AvroSchemaDeserializer {
schema: Option<apache_avro::Schema>,
}

struct JsonDeserializer {
pub(crate) struct JsonDeserializer {
decoder: EasyJsonDecoder,
}

Expand Down
Loading

0 comments on commit 72aff33

Please sign in to comment.