Skip to content

Commit fbb543e

Browse files
authored
Replace indexmap with vec (#90)
1 parent b7d36ee commit fbb543e

File tree

167 files changed

+1742
-1579
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+1742
-1579
lines changed

CHANGELOG.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## Unreleased
44

5+
- All "map" types in the protocol that were previously of type `IndexMap<K, V>` are now of type `Vec<V>`, the value of `K` is stored as a field within `V`.
6+
- This was done to resolve <https://github.com/tychedelia/kafka-protocol-rs/issues/84> and improve decoding speed.
7+
- If you were previously calling `.get()` on the map, the best way to migrate is to refactor your code to avoid the need to lookup by iterating over the items in the response instead.
8+
- Alternatively, you could replace `responses.get(name)` with something like `responses.iter().find(|x| x.name == name)` to achieve the same result. But note that this access is now O(N) instead of O(1).
9+
- Alternatively, you could use an intermediate hashmap before converting to a Vec to retain the O(1) lookup.
510
- Update protocol to kafka 3.8.0
611
- The Debug impl for new type wrappers now passes directly to the inner type.
712
The full list of new type wrappers is BrokerId, GroupId, ProducerId, TopicName and TransactionalId.
@@ -22,11 +27,11 @@
2227
- Use `IntoIterator` instead of `Iterator` for `RecordBatchEncoder::encode`
2328
- Use CRC-32 ISO/HDLC instead of CRC-32 CKSUM.
2429
- Add `Display` and more `From<T>` implementations for `StrBytes`.
25-
- Avoid redunand variant names in RequestKind/ResponseKind.
30+
- Avoid redundant variant names in RequestKind/ResponseKind.
2631

2732
## v0.10.2
2833

29-
- Implement From<T> for RequestKind and ResponseKind.
34+
- Implement `From<T>` for RequestKind and ResponseKind.
3035

3136
## v0.10.1
3237

@@ -60,7 +65,7 @@ and other misc improvements.
6065

6166
## v0.7.0
6267

63-
- Switch to [crc32c](https://crates.io/crates/crc32c) crate, providing hardware accelration for crc operations
68+
- Switch to [crc32c](https://crates.io/crates/crc32c) crate, providing hardware acceleration for crc operations
6469
on supported platforms.
6570
- Formatting fixes.
6671
- Miscellaneous dependency updates.

protocol_codegen/src/generate_messages/generate.rs

Lines changed: 54 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use std::cmp::Ordering;
1515
struct PreparedStruct {
1616
spec_type: SpecType,
1717
name: String,
18-
map_key: Option<Box<PreparedType>>,
1918
prepared_fields: Vec<PreparedField>,
2019
valid_versions: VersionSpec,
2120
flexible_msg_versions: VersionSpec,
@@ -83,7 +82,6 @@ enum PreparedType {
8382
Entity(EntityType),
8483
Struct(PreparedStruct),
8584
Array(Box<PreparedType>),
86-
Map(Box<PreparedType>, String),
8785
}
8886

8987
impl PreparedType {
@@ -93,7 +91,6 @@ impl PreparedType {
9391
Self::Entity(entity_type) => format!("super::{}", entity_type.name),
9492
Self::Struct(inner) => inner.name.clone(),
9593
Self::Array(inner) => format!("Vec<{}>", inner.rust_name()),
96-
Self::Map(key, value) => format!("indexmap::IndexMap<{}, {}>", key.rust_name(), value),
9794
}
9895
}
9996
fn name(&self, flexible: bool, optional: bool) -> String {
@@ -114,13 +111,6 @@ impl PreparedType {
114111
format!("types::Array({})", inner.name(flexible, false))
115112
}
116113
}
117-
Self::Map(_, _) => {
118-
if flexible {
119-
"types::CompactArray(types::Struct { version })".into()
120-
} else {
121-
"types::Array(types::Struct { version })".into()
122-
}
123-
}
124114
}
125115
}
126116
pub fn has_compact_form(&self) -> bool {
@@ -129,15 +119,14 @@ impl PreparedType {
129119
Self::Entity(entity_type) => entity_type.inner.has_compact_form(),
130120
Self::Struct(_) => false,
131121
Self::Array(_) => true,
132-
Self::Map(_, _) => true,
133122
}
134123
}
135124
fn default(&self) -> PreparedDefault {
136125
match self {
137126
Self::Primitive(prim) => primitive_default(*prim),
138127
Self::Entity(entity_type) => primitive_default(entity_type.inner),
139128
Self::Struct(_) => PreparedDefault::EmptyStruct,
140-
Self::Array(_) | Self::Map(_, _) => PreparedDefault::Empty,
129+
Self::Array(_) => PreparedDefault::Empty,
141130
}
142131
}
143132
fn is_entity(&self) -> bool {
@@ -224,18 +213,13 @@ struct PreparedField {
224213
default: PreparedDefault,
225214
ignorable: bool,
226215
_entity_type: Option<String>,
227-
map_key: bool,
228216
about: String,
229217
flexible_versions: VersionSpec,
230218
}
231219

232220
impl PreparedField {
233221
fn var_name(&self) -> Expr {
234-
if self.map_key {
235-
Expr::new_atom("key")
236-
} else {
237-
Expr::new_atom("self").field(&self.name).by_ref()
238-
}
222+
Expr::new_atom("self").field(&self.name).by_ref()
239223
}
240224
}
241225

@@ -275,7 +259,6 @@ fn prepare_field_type<W: Write>(
275259
} else {
276260
PreparedStruct {
277261
name: name.clone(),
278-
map_key: None,
279262
prepared_fields: vec![],
280263
valid_versions,
281264
flexible_msg_versions,
@@ -313,14 +296,7 @@ fn prepare_field_type<W: Write>(
313296
prepared_structs_output,
314297
spec_type,
315298
)?;
316-
match prepared_field {
317-
PreparedType::Struct(PreparedStruct {
318-
name,
319-
map_key: Some(map_key),
320-
..
321-
}) => PreparedType::Map(map_key, name),
322-
other => PreparedType::Array(Box::new(other)),
323-
}
299+
PreparedType::Array(Box::new(prepared_field))
324300
}
325301
})
326302
}
@@ -755,11 +731,7 @@ fn write_decode_field<W: Write>(
755731
field: &PreparedField,
756732
valid_versions: VersionSpec,
757733
) -> Result<(), Error> {
758-
let var_name = if field.map_key {
759-
"key_field"
760-
} else {
761-
&field.name
762-
};
734+
let var_name = &field.name;
763735

764736
if field.tagged_versions.is_none() {
765737
write!(w, "let {} = ", var_name)?;
@@ -884,11 +856,7 @@ fn write_decode_tag_buffer<W: Write>(
884856
write!(w, "match tag ")?;
885857
w.block(|w| {
886858
for (&k, field) in &sorted_tagged_fields {
887-
let var_name = if field.map_key {
888-
"key_field"
889-
} else {
890-
&field.name
891-
};
859+
let var_name = &field.name;
892860
write!(w, "{} => ", k)?;
893861
w.block(|w| {
894862
let tagged_field_versions =
@@ -985,9 +953,6 @@ fn prepared_struct_def<W: Write>(
985953
spec_type: SpecType,
986954
) -> Result<PreparedStruct, Error> {
987955
let mut prepared_fields = Vec::new();
988-
let mut map_key = None;
989-
990-
let num_map_keys = fields.iter().filter(|field| field.map_key).count();
991956

992957
for field in fields {
993958
let type_ = prepare_field_type(
@@ -1003,10 +968,6 @@ fn prepared_struct_def<W: Write>(
1003968
spec_type,
1004969
)?;
1005970

1006-
if field.map_key && num_map_keys == 1 {
1007-
map_key = Some(Box::new(type_.clone()))
1008-
}
1009-
1010971
let mut name = field.name.to_snake_case();
1011972
if name == "type" {
1012973
name = "_type".to_string();
@@ -1068,7 +1029,6 @@ fn prepared_struct_def<W: Write>(
10681029
default,
10691030
ignorable: field.ignorable,
10701031
_entity_type: field.entity_type.clone(),
1071-
map_key: field.map_key && num_map_keys == 1,
10721032
about: field.about.clone(),
10731033
flexible_versions,
10741034
});
@@ -1077,7 +1037,6 @@ fn prepared_struct_def<W: Write>(
10771037
let prepared_struct = PreparedStruct {
10781038
spec_type,
10791039
name: name.into(),
1080-
map_key,
10811040
prepared_fields,
10821041
valid_versions,
10831042
flexible_msg_versions,
@@ -1095,9 +1054,6 @@ impl PreparedStruct {
10951054
write!(w, "pub struct {} ", self.name)?;
10961055
w.block(|w| {
10971056
for prepared_field in &self.prepared_fields {
1098-
if prepared_field.map_key {
1099-
continue;
1100-
}
11011057
writeln!(w, "/// {}", prepared_field.about)?;
11021058
writeln!(w, "/// ")?;
11031059
writeln!(w, "/// Supported API versions: {}", prepared_field.versions)?;
@@ -1132,10 +1088,6 @@ impl PreparedStruct {
11321088
write!(w, "impl {} ", self.name)?;
11331089
w.block(|w| {
11341090
for prepared_field in &self.prepared_fields {
1135-
if prepared_field.map_key {
1136-
continue;
1137-
}
1138-
11391091
writeln!(w, "/// Sets `{}` to the passed value.", prepared_field.name)?;
11401092
writeln!(w, "/// ")?;
11411093
writeln!(w, "/// {}", prepared_field.about)?;
@@ -1200,38 +1152,41 @@ impl PreparedStruct {
12001152
SpecType::Response => writeln!(w, "#[cfg(feature = \"broker\")]")?,
12011153
_ => {}
12021154
}
1203-
if self.map_key.is_some() {
1204-
write!(w, "impl MapEncodable for {} ", self.name)?;
1205-
} else {
1206-
write!(w, "impl Encodable for {} ", self.name)?;
1207-
}
1155+
1156+
write!(w, "impl Encodable for {} ", self.name)?;
12081157
w.block(|w| {
1209-
if let Some(key) = &self.map_key {
1210-
writeln!(w, "type Key = {};", key.rust_name())?;
1211-
write!(w, "fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<()> ")?;
1212-
} else {
1213-
write!(w, "fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> ")?;
1214-
}
1158+
write!(
1159+
w,
1160+
"fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> "
1161+
)?;
12151162
w.block(|w| {
12161163
for prepared_field in &self.prepared_fields {
12171164
write_encode_field(w, prepared_field, self.valid_versions, false)?;
12181165
}
1219-
write_encode_tag_buffer(w, &self.prepared_fields, self.valid_versions, self.flexible_msg_versions, false)?;
1166+
write_encode_tag_buffer(
1167+
w,
1168+
&self.prepared_fields,
1169+
self.valid_versions,
1170+
self.flexible_msg_versions,
1171+
false,
1172+
)?;
12201173
write!(w, "Ok(())")?;
12211174
Ok(())
12221175
})?;
12231176
writeln!(w)?;
1224-
if self.map_key.is_some() {
1225-
write!(w, "fn compute_size(&self, key: &Self::Key, version: i16) -> Result<usize> ")?;
1226-
} else {
1227-
write!(w, "fn compute_size(&self, version: i16) -> Result<usize> ")?;
1228-
}
1177+
write!(w, "fn compute_size(&self, version: i16) -> Result<usize> ")?;
12291178
w.block(|w| {
12301179
writeln!(w, "let mut total_size = 0;")?;
12311180
for prepared_field in &self.prepared_fields {
12321181
write_encode_field(w, prepared_field, self.valid_versions, true)?;
12331182
}
1234-
write_encode_tag_buffer(w, &self.prepared_fields, self.valid_versions, self.flexible_msg_versions, true)?;
1183+
write_encode_tag_buffer(
1184+
w,
1185+
&self.prepared_fields,
1186+
self.valid_versions,
1187+
self.flexible_msg_versions,
1188+
true,
1189+
)?;
12351190
write!(w, "Ok(total_size)")?;
12361191
Ok(())
12371192
})?;
@@ -1245,36 +1200,29 @@ impl PreparedStruct {
12451200
SpecType::Response => writeln!(w, "#[cfg(feature = \"client\")]")?,
12461201
_ => {}
12471202
}
1248-
if self.map_key.is_some() {
1249-
write!(w, "impl MapDecodable for {} ", self.name)?;
1250-
} else {
1251-
write!(w, "impl Decodable for {} ", self.name)?;
1252-
}
1203+
write!(w, "impl Decodable for {} ", self.name)?;
12531204
w.block(|w| {
1254-
if let Some(key) = &self.map_key {
1255-
writeln!(w, "type Key = {};", key.rust_name())?;
1256-
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<(Self::Key, Self)> ")?;
1257-
} else {
1258-
write!(w, "fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> ")?;
1259-
}
1205+
write!(
1206+
w,
1207+
"fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> "
1208+
)?;
12601209
w.block(|w| {
12611210
for prepared_field in &self.prepared_fields {
12621211
write_decode_field(w, prepared_field, self.valid_versions)?;
12631212
}
12641213
if !self.flexible_msg_versions.is_none() {
12651214
writeln!(w, "let mut unknown_tagged_fields = BTreeMap::new();")?;
1266-
write_decode_tag_buffer(w, &self.prepared_fields, self.valid_versions, self.flexible_msg_versions)?;
1267-
}
1268-
if self.map_key.is_some() {
1269-
write!(w, "Ok((key_field, Self ")?;
1270-
} else {
1271-
write!(w, "Ok(Self ")?;
1215+
write_decode_tag_buffer(
1216+
w,
1217+
&self.prepared_fields,
1218+
self.valid_versions,
1219+
self.flexible_msg_versions,
1220+
)?;
12721221
}
1222+
write!(w, "Ok(Self ")?;
12731223
w.block(|w| {
12741224
for prepared_field in &self.prepared_fields {
1275-
if !prepared_field.map_key {
1276-
writeln!(w, "{},", prepared_field.name)?;
1277-
}
1225+
writeln!(w, "{},", prepared_field.name)?;
12781226
}
12791227

12801228
if !self.flexible_msg_versions.is_none() {
@@ -1283,11 +1231,8 @@ impl PreparedStruct {
12831231

12841232
Ok(())
12851233
})?;
1286-
if self.map_key.is_some() {
1287-
write!(w, "))")?;
1288-
} else {
1289-
write!(w, ")")?;
1290-
}
1234+
1235+
write!(w, ")")?;
12911236
Ok(())
12921237
})
12931238
})?;
@@ -1301,17 +1246,15 @@ impl PreparedStruct {
13011246
write!(w, "Self ")?;
13021247
w.block(|w| {
13031248
for prepared_field in &self.prepared_fields {
1304-
if !prepared_field.map_key {
1305-
writeln!(
1306-
w,
1307-
"{}: {},",
1308-
prepared_field.name,
1309-
prepared_field.default.gen_default(
1310-
prepared_field.optional,
1311-
prepared_field.type_.is_entity(),
1312-
)
1313-
)?;
1314-
}
1249+
writeln!(
1250+
w,
1251+
"{}: {},",
1252+
prepared_field.name,
1253+
prepared_field.default.gen_default(
1254+
prepared_field.optional,
1255+
prepared_field.type_.is_entity(),
1256+
)
1257+
)?;
13151258
}
13161259

13171260
if !self.flexible_msg_versions.is_none() {
@@ -1373,7 +1316,10 @@ fn write_file_header<W: Write>(w: &mut CodeWriter<W>, name: &str) -> Result<(),
13731316
writeln!(w, "use anyhow::{{bail, Result}};")?;
13741317
writeln!(w)?;
13751318
writeln!(w, "use crate::protocol::{{")?;
1376-
writeln!(w, " Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, Message, HeaderVersion, VersionRange,")?;
1319+
writeln!(
1320+
w,
1321+
" Encodable, Decodable, Encoder, Decoder, Message, HeaderVersion, VersionRange,"
1322+
)?;
13771323
writeln!(w, " types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{{ByteBuf, ByteBufMut}}")?;
13781324
writeln!(w, "}};")?;
13791325
writeln!(w)?;

src/messages/add_offsets_to_txn_request.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use uuid::Uuid;
1414
use crate::protocol::{
1515
buf::{ByteBuf, ByteBufMut},
1616
compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17-
Encodable, Encoder, HeaderVersion, MapDecodable, MapEncodable, Message, StrBytes, VersionRange,
17+
Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
1818
};
1919

2020
/// Valid versions: 0-4

src/messages/add_offsets_to_txn_response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use uuid::Uuid;
1414
use crate::protocol::{
1515
buf::{ByteBuf, ByteBufMut},
1616
compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17-
Encodable, Encoder, HeaderVersion, MapDecodable, MapEncodable, Message, StrBytes, VersionRange,
17+
Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
1818
};
1919

2020
/// Valid versions: 0-4

0 commit comments

Comments
 (0)