Skip to content

Conversation

martin-g
Copy link
Member

@martin-g martin-g commented Jul 28, 2025

This PR should be reviewed with whitespaces ignored - https://github.com/apache/avro-rs/pull/246/files?w=1

What

The purpose of this PR is to experiment with https://github.com/sgr-team/rs_synca crate to generate both async and sync/blocking APIs of the apache_avro SDK. Synca uses Rust procedural macros to generate the blocking version of the code from the manually written asynchronous version.

How it works

Synca works by "annotating" a module with async functions with #[synca::synca] attribute and some rules what to do when generating the sync/blocking version of the code, e.g. to remove all occurrences of async and .await and to replace some uses (imports) with others. In addition to removing async/await it also removes any usages of Box::pin() for the awaited calls which are needed to solve the issue with recursion in async methods.

Most of the data structures, like schema::Schema, types::Value, error::Error are out of Synca's scope and are reusable from both sync and async callers. Some of their methods need to be async and they are extracted to XyzExt, e.g. SchemaExt structs which are processed by Synca. For example Schema::parse_str(&str) -> AvroResult<Self> needs to be async now and that's why it is moved to SchemaExt::parse_str(&str) -> AvroResult<Schema>. Same is valid for Value's validate() and resolve() methods.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Convert more methods to async

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
items

We need to avoid using `syn::UseRename` because it may cause issues
after `synca` replacements

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Now `clear && cargo expand --no-default-features --features xyz 2>&1 >
generated_xyz.rs` works for both xyz=sync and xyz=tokio in non-test code

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
… --features tokio`

The `sync` feature build fails due to the new `Box::pin(...)` wrappers
for all recursive calls

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
This way we could use .await without extra troubles

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
TODO: examples, doc tests and IT tests

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
It is not finished - somehow two lines of code lead rustc to run forever

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
This way we need more cloning but otherwise Rustc complains that it
cannot resolve whether &schema::tokio::Schema is Send

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Introduce XyzExt structs for the Synca managed functionality, e.g.
SchemaExt with an `impl` that contains all async methods
(SchemaExt::parse_str(...) -> AvroResult<Schema>)

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Revert async for ser_schema.rs

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
.map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
Value::BigDecimal(ref bg) => {
let vec1: Vec<u8> = serialize_big_decimal(bg)?;
Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
let vec1: Vec<u8> = crate::bigdecimal::sync::serialize_big_decimal(bg)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kriskras99 I need some help here.
This is an impl of TryFrom (i.e. cannot be async) and it needs to use an async function here (serialize_big_decimal that uses several encode_xyz() async functions).
Ideas how to make it work ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't. You need to add the try_from function to a regular impl block instead of implementing the trait. Or as you did, always use the sync version because you're writing to a Vec anyway.

This is one of the reasons I separate the reading from creating the final output in my state machine implementation.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
ValueExt::try_from(types::Value) -> serde_json::Value seems unused

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
@martin-g
Copy link
Member Author

martin-g commented Aug 14, 2025

Woot!
The build and tests pass for both sync and async!

TODO:

  • fix the WASM demo
  • optimize the new dependencies features (use only the ones we actually need)
  • see whether the newly introduced .clone()s can be reverted to borrowings again, e.g. in ResolvedSchema
  • decide what to do with Synca. At the moment I use a fork with some improvements (replacement of use/imports, removal of Box::pin(...).async). There is no feedback from the original author about the proposed changes but I have no added unit tests too :-/. Also we can add cargo features for the new functionality (replace use and remove Box::pin())
  • add async examples and IT tests
  • decide how to export the sync and async APIs. For example reqwest crate exports the async APIs in request::* and the sync ones in request::sync::*. Exporting the sync APIs in the root would be backward-compatible/friendlier though.
  • figure out replacement for the thread locals in bytes.rs. Those are related to serde so they should be used only in sync APIs
  • document that the Serde way works only with sync APIs
  • ...

@Kriskras99
Copy link
Contributor

Nice, congrats! I do question if Schema::parse_str and some other functions really need to be async, as they don't do any IO?

Schema::parse_str is only async because Parser::parse is async, but that is async because it recursively depends on itself (I think?).

Anyway, really nice work!

@martin-g
Copy link
Member Author

Good point!
I also would prefer to minimize the API breaks as much as possible!
But parse_reader() will need to stay in SchemaExt.

That reminds me: is SchemaExt a good name for the IO related functionality ? Or maybe SchemaIO ?! Or something else ?

@Kriskras99
Copy link
Contributor

I think SchemaExt is good, SchemaIo and SchemaIO just look weird.

FYI, SchemaExt is currently not exported. It doesn't show up when I run cargo doc (even with --document-private-items) so not sure what's going on there.

@martin-g
Copy link
Member Author

FYI, SchemaExt is currently not exported. It doesn't show up when I run cargo doc (even with --document-private-items) so not sure what's going on there.

I will add some async IT tests and this should expose such issues!

BTW it is still not decided that the Synca approach wins over SansIO! Take your time and once you are ready we will decide what to do!

In the meantime we could release a new version(s) from main!

@Kriskras99
Copy link
Contributor

Even better would be SchemaSync and SchemaAsync traits that you implement directly on Schema. That way you don't need the extra struct.

@Kriskras99
Copy link
Contributor

As there are some people waiting for a release, might be good to start one. There's nothing important coming that should be included in the next release right?

@martin-g
Copy link
Member Author

As there are some people waiting for a release, might be good to start one. There's nothing important coming that should be included in the next release right?

Nothing I am aware of!
Just our PRs with the small improvements

@Kriskras99
Copy link
Contributor

#259 and #260 both are not user visible changes, so don't need to be included in the release if they make rebasing too annoying.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
@martin-g
Copy link
Member Author

Schema::parse_str is only async because Parser::parse is async, but that is async because it recursively depends on itself (I think?).

It appeared to be more complex.

Parser::parse_record calls RecordFieldExt::parse() and it calls Self::resolve_default_value() and this one calls ValueExt::resolve_internal() and this calls bigdecimal::deserialize_big_decimal() and this calls decode_long() that needs to be async...

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
@Kriskras99
Copy link
Contributor

That's a shame, I missed that branch

@martin-g
Copy link
Member Author

One simple call like the one to encode_long leads to many additions of async in the APIs...
And in this case it is always used with a Vec<u8> in bigdecimal.rs!
Maybe it could be optimized/reworked somehow ...

@martin-g
Copy link
Member Author

martin-g commented Aug 15, 2025

Not sure whether it is bad or not but I needed to replace in several places usage of iterator combinators with for/while loops because I needed to call an async method in their closures.
IMO this way it is simpler/clearer than dealing with futures::join_all!() for the async version of the code.
Now the code is the same for both async and sync versions and SyncA removes the .awaits for the sync version.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
This way Tokio could be just a dev-dependency and users could use
another async runtime if they want!

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
This way "tokio" is not advertized.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants