Skip to content

Commit 879ed99

Browse files
committed
More WIP
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
1 parent f7a13c0 commit 879ed99

File tree

19 files changed

+17807
-17171
lines changed

19 files changed

+17807
-17171
lines changed

avro/src/bigdecimal.rs

Lines changed: 166 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -22,89 +22,113 @@
2222
pub mod sync {
2323
sync!();
2424
replace!(
25-
tokio::io::AsyncRead => std::io::Read,
25+
bigdecimal::tokio => bigdecimal::sync,
2626
decode::tokio => decode::sync,
27+
encode::tokio => encode::sync,
28+
error::tokio => error::sync,
29+
schema::tokio => schema::sync,
30+
util::tokio => util::sync,
2731
#[tokio::test] => #[test]
2832
);
2933
}
3034
)]
31-
mod bigdecimal {}
32-
use crate::{
33-
AvroResult,
34-
decode::tokio::{decode_len, decode_long},
35-
encode::{encode_bytes, encode_long},
36-
error::Details,
37-
types::Value,
38-
};
39-
pub use bigdecimal::BigDecimal;
40-
use num_bigint::BigInt;
41-
use std::io::Read;
42-
43-
pub(crate) fn big_decimal_as_bytes(decimal: &BigDecimal) -> AvroResult<Vec<u8>> {
44-
let mut buffer: Vec<u8> = Vec::new();
45-
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
46-
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
47-
encode_bytes(&big_endian_value, &mut buffer)?;
48-
encode_long(exponent, &mut buffer)?;
49-
50-
Ok(buffer)
51-
}
52-
53-
pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> AvroResult<Vec<u8>> {
54-
// encode big decimal, without global size
55-
let buffer = big_decimal_as_bytes(decimal)?;
35+
mod bigdecimal {
36+
use crate::{
37+
AvroResult,
38+
decode::tokio::{decode_len, decode_long},
39+
encode::tokio::{encode_bytes, encode_long},
40+
error::tokio::Details,
41+
types::tokio::Value,
42+
};
43+
pub use bigdecimal::BigDecimal;
44+
use num_bigint::BigInt;
45+
use std::io::Read;
46+
47+
pub(crate) fn big_decimal_as_bytes(decimal: &BigDecimal) -> AvroResult<Vec<u8>> {
48+
let mut buffer: Vec<u8> = Vec::new();
49+
let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
50+
let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
51+
encode_bytes(&big_endian_value, &mut buffer)?;
52+
encode_long(exponent, &mut buffer)?;
53+
54+
Ok(buffer)
55+
}
5656

57-
// encode global size and content
58-
let mut final_buffer: Vec<u8> = Vec::new();
59-
encode_bytes(&buffer, &mut final_buffer)?;
57+
pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> AvroResult<Vec<u8>> {
58+
// encode big decimal, without global size
59+
let buffer = big_decimal_as_bytes(decimal)?;
6060

61-
Ok(final_buffer)
62-
}
61+
// encode global size and content
62+
let mut final_buffer: Vec<u8> = Vec::new();
63+
encode_bytes(&buffer, &mut final_buffer)?;
6364

64-
pub(crate) async fn deserialize_big_decimal(bytes: &Vec<u8>) -> AvroResult<BigDecimal> {
65-
let mut bytes: &[u8] = bytes.as_slice();
66-
let mut big_decimal_buffer = match decode_len(&mut bytes).await {
67-
Ok(size) => vec![0u8; size],
68-
Err(err) => return Err(Details::BigDecimalLen(Box::new(err)).into()),
69-
};
70-
71-
bytes
72-
.read_exact(&mut big_decimal_buffer[..])
73-
.map_err(Details::ReadDouble)?;
65+
Ok(final_buffer)
66+
}
7467

75-
match decode_long(&mut bytes).await {
76-
Ok(Value::Long(scale_value)) => {
77-
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
78-
let decimal = BigDecimal::new(big_int, scale_value);
79-
Ok(decimal)
68+
pub(crate) async fn deserialize_big_decimal(bytes: &Vec<u8>) -> AvroResult<BigDecimal> {
69+
let mut bytes: &[u8] = bytes.as_slice();
70+
let mut big_decimal_buffer = match decode_len(&mut bytes).await {
71+
Ok(size) => vec![0u8; size],
72+
Err(err) => return Err(Details::BigDecimalLen(Box::new(err)).into()),
73+
};
74+
75+
bytes
76+
.read_exact(&mut big_decimal_buffer[..])
77+
.map_err(Details::ReadDouble)?;
78+
79+
match decode_long(&mut bytes).await {
80+
Ok(Value::Long(scale_value)) => {
81+
let big_int: BigInt = BigInt::from_signed_bytes_be(&big_decimal_buffer);
82+
let decimal = BigDecimal::new(big_int, scale_value);
83+
Ok(decimal)
84+
}
85+
_ => Err(Details::BigDecimalScale.into()),
8086
}
81-
_ => Err(Details::BigDecimalScale.into()),
8287
}
83-
}
8488

85-
#[cfg(test)]
86-
mod tests {
87-
use super::*;
88-
use crate::{Codec, Reader, Schema, Writer, error::Error, types::Record};
89-
use apache_avro_test_helper::TestResult;
90-
use bigdecimal::{One, Zero};
91-
use pretty_assertions::assert_eq;
92-
use std::{
93-
fs::File,
94-
io::BufReader,
95-
ops::{Div, Mul},
96-
str::FromStr,
97-
};
98-
99-
#[tokio::test]
100-
async fn test_avro_3779_bigdecimal_serial() -> TestResult {
101-
let value: BigDecimal =
102-
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
103-
let mut current: BigDecimal = BigDecimal::one();
104-
105-
for iter in 1..180 {
106-
let buffer: Vec<u8> = serialize_big_decimal(&current)?;
89+
#[cfg(test)]
90+
mod tests {
91+
use super::*;
92+
use crate::{
93+
codec::tokio::Codec, error::tokio::Error, reader::tokio::Reader, schema::tokio::Schema,
94+
types::tokio::Record, writer::tokio::Writer,
95+
};
96+
use apache_avro_test_helper::TestResult;
97+
use bigdecimal::{One, Zero};
98+
use pretty_assertions::assert_eq;
99+
use std::{
100+
fs::File,
101+
io::BufReader,
102+
ops::{Div, Mul},
103+
str::FromStr,
104+
};
105+
106+
#[tokio::test]
107+
async fn test_avro_3779_bigdecimal_serial() -> TestResult {
108+
let value: BigDecimal =
109+
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
110+
let mut current: BigDecimal = BigDecimal::one();
111+
112+
for iter in 1..180 {
113+
let buffer: Vec<u8> = serialize_big_decimal(&current)?;
114+
115+
let mut as_slice = buffer.as_slice();
116+
decode_long(&mut as_slice).await?;
117+
118+
let mut result: Vec<u8> = Vec::new();
119+
result.extend_from_slice(as_slice);
120+
121+
let deserialize_big_decimal: Result<BigDecimal, Error> =
122+
deserialize_big_decimal(&result);
123+
assert!(
124+
deserialize_big_decimal.is_ok(),
125+
"can't deserialize for iter {iter}"
126+
);
127+
assert_eq!(current, deserialize_big_decimal?, "not equals for {iter}");
128+
current = current.mul(&value);
129+
}
107130

131+
let buffer: Vec<u8> = serialize_big_decimal(&BigDecimal::zero())?;
108132
let mut as_slice = buffer.as_slice();
109133
decode_long(&mut as_slice).await?;
110134

@@ -115,36 +139,20 @@ mod tests {
115139
deserialize_big_decimal(&result);
116140
assert!(
117141
deserialize_big_decimal.is_ok(),
118-
"can't deserialize for iter {iter}"
142+
"can't deserialize for zero"
143+
);
144+
assert_eq!(
145+
BigDecimal::zero(),
146+
deserialize_big_decimal?,
147+
"not equals for zero"
119148
);
120-
assert_eq!(current, deserialize_big_decimal?, "not equals for {iter}");
121-
current = current.mul(&value);
122-
}
123149

124-
let buffer: Vec<u8> = serialize_big_decimal(&BigDecimal::zero())?;
125-
let mut as_slice = buffer.as_slice();
126-
decode_long(&mut as_slice).await?;
127-
128-
let mut result: Vec<u8> = Vec::new();
129-
result.extend_from_slice(as_slice);
130-
131-
let deserialize_big_decimal: Result<BigDecimal, Error> = deserialize_big_decimal(&result);
132-
assert!(
133-
deserialize_big_decimal.is_ok(),
134-
"can't deserialize for zero"
135-
);
136-
assert_eq!(
137-
BigDecimal::zero(),
138-
deserialize_big_decimal?,
139-
"not equals for zero"
140-
);
141-
142-
Ok(())
143-
}
150+
Ok(())
151+
}
144152

145-
#[tokio::test]
146-
async fn test_avro_3779_record_with_bg() -> TestResult {
147-
let schema_str = r#"
153+
#[tokio::test]
154+
async fn test_avro_3779_record_with_bg() -> TestResult {
155+
let schema_str = r#"
148156
{
149157
"type": "record",
150158
"name": "test",
@@ -157,66 +165,67 @@ mod tests {
157165
]
158166
}
159167
"#;
160-
let schema = Schema::parse_str(schema_str)?;
161-
162-
// build record with big decimal value
163-
let mut record = Record::new(&schema).unwrap();
164-
let val = BigDecimal::new(BigInt::from(12), 2);
165-
record.put("field_name", val.clone());
166-
167-
// write a record
168-
let codec = Codec::Null;
169-
let mut writer = Writer::builder()
170-
.schema(&schema)
171-
.codec(codec)
172-
.writer(Vec::new())
173-
.build();
174-
175-
writer.append(record.clone())?;
176-
writer.flush()?;
177-
178-
// read record
179-
let wrote_data = writer.into_inner()?;
180-
let mut reader = Reader::new(&wrote_data[..])?;
181-
182-
let value = reader.next().await.unwrap()?;
183-
184-
// extract field value
185-
let big_decimal_value: &Value = match value {
186-
Value::Record(ref fields) => Ok(&fields[0].1),
187-
other => Err(format!("Expected a Value::Record, got: {other:?}")),
188-
}?;
189-
190-
let x1res: &BigDecimal = match big_decimal_value {
191-
Value::BigDecimal(s) => Ok(s),
192-
other => Err(format!("Expected Value::BigDecimal, got: {other:?}")),
193-
}?;
194-
assert_eq!(&val, x1res);
195-
196-
Ok(())
197-
}
168+
let schema = Schema::parse_str(schema_str)?;
169+
170+
// build record with big decimal value
171+
let mut record = Record::new(&schema).unwrap();
172+
let val = BigDecimal::new(BigInt::from(12), 2);
173+
record.put("field_name", val.clone());
174+
175+
// write a record
176+
let codec = Codec::Null;
177+
let mut writer = Writer::builder()
178+
.schema(&schema)
179+
.codec(codec)
180+
.writer(Vec::new())
181+
.build();
182+
183+
writer.append(record.clone())?;
184+
writer.flush()?;
185+
186+
// read record
187+
let wrote_data = writer.into_inner()?;
188+
let mut reader = Reader::new(&wrote_data[..])?;
189+
190+
let value = reader.next().await.unwrap()?;
191+
192+
// extract field value
193+
let big_decimal_value: &Value = match value {
194+
Value::Record(ref fields) => Ok(&fields[0].1),
195+
other => Err(format!("Expected a Value::Record, got: {other:?}")),
196+
}?;
197+
198+
let x1res: &BigDecimal = match big_decimal_value {
199+
Value::BigDecimal(s) => Ok(s),
200+
other => Err(format!("Expected Value::BigDecimal, got: {other:?}")),
201+
}?;
202+
assert_eq!(&val, x1res);
203+
204+
Ok(())
205+
}
198206

199-
#[tokio::test]
200-
async fn test_avro_3779_from_java_file() -> TestResult {
201-
// Open file generated with Java code to ensure compatibility
202-
// with Java big decimal logical type.
203-
let file: File = File::open("./tests/bigdec.avro")?;
204-
let mut reader = Reader::new(BufReader::new(&file))?;
205-
let next_element = reader.next().await;
206-
assert!(next_element.is_some());
207-
let value = next_element.unwrap()?;
208-
let bg = match value {
209-
Value::Record(ref fields) => Ok(&fields[0].1),
210-
other => Err(format!("Expected a Value::Record, got: {other:?}")),
211-
}?;
212-
let value_big_decimal = match bg {
213-
Value::BigDecimal(val) => Ok(val),
214-
other => Err(format!("Expected a Value::BigDecimal, got: {other:?}")),
215-
}?;
216-
217-
let ref_value = BigDecimal::from_str("2.24")?;
218-
assert_eq!(&ref_value, value_big_decimal);
219-
220-
Ok(())
207+
#[tokio::test]
208+
async fn test_avro_3779_from_java_file() -> TestResult {
209+
// Open file generated with Java code to ensure compatibility
210+
// with Java big decimal logical type.
211+
let file: File = File::open("./tests/bigdec.avro")?;
212+
let mut reader = Reader::new(BufReader::new(&file))?;
213+
let next_element = reader.next().await;
214+
assert!(next_element.is_some());
215+
let value = next_element.unwrap()?;
216+
let bg = match value {
217+
Value::Record(ref fields) => Ok(&fields[0].1),
218+
other => Err(format!("Expected a Value::Record, got: {other:?}")),
219+
}?;
220+
let value_big_decimal = match bg {
221+
Value::BigDecimal(val) => Ok(val),
222+
other => Err(format!("Expected a Value::BigDecimal, got: {other:?}")),
223+
}?;
224+
225+
let ref_value = BigDecimal::from_str("2.24")?;
226+
assert_eq!(&ref_value, value_big_decimal);
227+
228+
Ok(())
229+
}
221230
}
222231
}

0 commit comments

Comments
 (0)