Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Publishing #34

Merged
merged 10 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions .github/workflows/hedwig.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ on:

jobs:
lint:
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand All @@ -31,7 +29,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused
args: --all-features -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused

doc:
runs-on: ubuntu-latest
Expand All @@ -50,14 +48,14 @@ jobs:
command: doc
args: --all-features --manifest-path=Cargo.toml
env:
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Dbroken_intra_doc_links
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Drustdoc::broken_intra_doc_links

test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
rust_toolchain: [nightly, stable, 1.49.0]
rust_toolchain: [nightly, stable, 1.53.0]
os: [ubuntu-latest, windows-latest, macOS-latest]
timeout-minutes: 20
steps:
Expand Down
46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ name = "hedwig"
version = "4.1.0"
authors = [
"Aniruddha Maru <[email protected]>",
"Simonas Kazlauskas <[email protected]>"
"Simonas Kazlauskas <[email protected]>",
"Renar Narubin <[email protected]>",
]
edition = "2018"
repository = "https://github.com/standard-ai/hedwig-rust.git"
Expand All @@ -19,50 +20,49 @@ categories = ["asynchronous", "web-programming"]
maintenance = { status = "actively-developed" }

[features]
default = ["consume", "sink"]
default = []

# Whether publishing/consuming is enabled
publish = []
consume = ["async-trait", "either"]

# Publishers
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive", "uuid/serde"]
# Backends
google = ["ya-gcp", "tracing", "parking_lot"]
mock = ["async-channel", "parking_lot"]

# Validators
json-schema = ["valico", "serde_json", "serde"]
protobuf = ["prost"]

# Convenience API
sink = ["futures-util/sink", "either", "publish"]

[[example]]
name = "publish"
required-features = ["google", "json-schema"]
name = "googlepubsub"
required-features = ["google", "protobuf"]

[dependencies]
async-trait = { version = "0.1" }
bytes = "1"
futures-util = { version = "0.3", features = ["std"], default-features = false }
either = { version = "1", features = ["use_std"], default-features = false }
futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false }
pin-project = "1"
thiserror = { version = "1", default-features = false }
url = { version = "2", default-features = false }
uuid = { version = "^0.8", features = ["v4"], default-features = false }

async-trait = { version = "0.1", optional = true }
either = { version = "1", optional = true, features = ["use_std"], default-features = false }
async-channel = { version = "1.6", optional = true }
serde = { version = "^1.0", optional = true, default-features = false }
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
parking_lot = { version = "0.11", optional = true }
prost = { version = "0.8", optional = true, features = ["std"], default-features = false }
tracing = { version = "0.1.26", optional = true }
valico = { version = "^3.2", optional = true, default-features = false }
base64 = { version = "^0.13", optional = true, default-features = false }
http = { version = "^0.2", optional = true, default-features = false }
hyper = { version = "^0.14.4", optional = true, features = ["client", "stream"], default-features = false }
yup-oauth2 = { version = "5.1", optional = true, features = ["hyper-rustls"], default-features = false }
prost = { version = "0.7", optional = true, features = ["std"], default-features = false }
ya-gcp = { version = "0.6.3", features = ["pubsub"], optional = true }

[dev-dependencies]
hyper-tls = "0.5.0"
prost = { version = "0.7", features = ["std", "prost-derive"] }
async-channel = { version = "1.6" }
futures-channel = "0.3.17"
parking_lot = { version = "0.11" }
prost = { version = "0.8", features = ["std", "prost-derive"] }
tokio = { version = "1", features = ["macros", "rt"] }
tonic = "0.5"
serde = { version = "1", features = ["derive"] }
ya-gcp = { version = "0.6.3", features = ["pubsub", "emulators"] }
structopt = "0.3"

[package.metadata.docs.rs]
all-features = true
Expand Down
241 changes: 241 additions & 0 deletions examples/googlepubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//! An example of ingesting messages from a PubSub subscription, applying a
//! transformation, then submitting those transformations to another PubSub topic.

use futures_util::{SinkExt, StreamExt, TryFutureExt};
use hedwig::{
googlepubsub::{
AcknowledgeToken, AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig,
PubSubMessage, PublishError, ServiceAccountAuth, StreamSubscriptionConfig,
SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
validators, Consumer, DecodableMessage, EncodableMessage, Headers, Publisher,
};
use std::{error::Error as StdError, time::SystemTime};
use structopt::StructOpt;

const USER_CREATED_TOPIC: &str = "user.created";
const USER_UPDATED_TOPIC: &str = "user.updated";

/// The input data, representing some user being created with the given name
#[derive(PartialEq, Eq, prost::Message)]
struct UserCreatedMessage {
#[prost(string, tag = "1")]
name: String,
}

impl EncodableMessage for UserCreatedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;
fn topic(&self) -> hedwig::Topic {
USER_CREATED_TOPIC.into()
}
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.created/1.0",
Headers::new(),
self,
)?)
}
}

impl DecodableMessage for UserCreatedMessage {
type Error = validators::ProstDecodeError<validators::prost::SchemaMismatchError>;
type Decoder =
validators::ProstDecoder<validators::prost::ExactSchemaMatcher<UserCreatedMessage>>;

fn decode(msg: hedwig::ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error> {
decoder.decode(msg)
}
}

/// The output data, where the given user has now been assigned an ID and some metadata
#[derive(PartialEq, Eq, prost::Message)]
struct UserUpdatedMessage {
#[prost(string, tag = "1")]
name: String,

#[prost(int64, tag = "2")]
id: i64,

#[prost(string, tag = "3")]
metadata: String,
}

#[derive(Debug)]
struct TransformedMessage {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that might make sense to have provided by the library in a generic way, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's the AcknowledgeableMessage type that the user could employ (and a pubsub-specific alias PubSubMessage). Now that i think about it, I should alias the TransformedMessage type to one of those.

I've also thought about adding a map function to AcknowledgeableMessage so that a user could operate on the message and leave the token as-is, but then this goes down the rabbit hole of monad-ish types (implement flatmap? flatten? fallible mapping? async mapping?). Instead i left the fields as public so the user can construct them as they wish

// keep the input's ack token to ack when the output is successfully published, or nack on
// failure
input_token: AcknowledgeToken,
output: UserUpdatedMessage,
}

impl EncodableMessage for TransformedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;

fn topic(&self) -> hedwig::Topic {
USER_UPDATED_TOPIC.into()
}

fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.updated/1.0",
Headers::new(),
&self.output,
)?)
}
}

#[derive(Debug, StructOpt)]
struct Args {
/// The name of the pubsub project
#[structopt(long)]
project_name: String,
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn StdError>> {
let args = Args::from_args();

println!("Building PubSub clients");

let builder = ClientBuilder::new(
ClientBuilderConfig::new().auth_flow(AuthFlow::ServiceAccount(ServiceAccountAuth::EnvVar)),
PubSubConfig::default(),
)
.await?;

let input_topic_name = TopicName::new(USER_CREATED_TOPIC);
let subscription_name = SubscriptionName::new("user-metadata-updaters");

let output_topic_name = TopicName::new(USER_UPDATED_TOPIC);
const APP_NAME: &str = "user-metadata-updater";

let mut publisher_client = builder
.build_publisher(&args.project_name, APP_NAME)
.await?;
let mut consumer_client = builder.build_consumer(&args.project_name, APP_NAME).await?;

for topic_name in [&input_topic_name, &output_topic_name] {
println!("Creating topic {:?}", topic_name);

publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
}

println!("Creating subscription {:?}", &subscription_name);

consumer_client
.create_subscription(SubscriptionConfig {
topic: input_topic_name.clone(),
name: subscription_name.clone(),
..SubscriptionConfig::default()
})
.await?;

println!(
"Synthesizing input messages for topic {:?}",
&input_topic_name
);

{
let validator = validators::ProstValidator::new();
let mut input_sink =
Publisher::<UserCreatedMessage>::publish_sink(publisher_client.publisher(), validator);

for i in 1..=10 {
let message = UserCreatedMessage {
name: format!("Example Name #{}", i),
};

input_sink.feed(message).await?;
}
input_sink.flush().await?;
}

println!("Ingesting input messages, applying transformations, and publishing to destination");

let mut read_stream = consumer_client
.stream_subscription(
subscription_name.clone(),
StreamSubscriptionConfig::default(),
)
.consume::<UserCreatedMessage>(hedwig::validators::ProstDecoder::new(
hedwig::validators::prost::ExactSchemaMatcher::new("user.created/1.0"),
));

let mut output_sink = Publisher::<TransformedMessage, _>::publish_sink_with_responses(
publisher_client.publisher(),
validators::ProstValidator::new(),
futures_util::sink::unfold((), |_, message: TransformedMessage| async move {
// if the output is successfully sent, ack the input to mark it as processed
message.input_token.ack().await
}),
);

for i in 1..=10 {
let PubSubMessage { ack_token, message } = read_stream
.next()
.await
.expect("stream should have 10 elements")?;

assert_eq!(&message.name, &format!("Example Name #{}", i));

let transformed = TransformedMessage {
output: UserUpdatedMessage {
name: message.name,
id: random_id(),
metadata: "some metadata".into(),
},
input_token: ack_token,
};

output_sink
.feed(transformed)
.or_else(|publish_error| async move {
// if publishing fails, nack the failed messages to allow later retries
Err(match publish_error {
PublishError::Publish { cause, messages } => {
for failed_transform in messages {
failed_transform.input_token.nack().await?;
}
Box::<dyn StdError>::from(cause)
}
err => Box::<dyn StdError>::from(err),
})
})
.await?
}
output_sink.flush().await?;

println!("All messages matched and published successfully!");

println!("Deleting subscription {:?}", &subscription_name);

consumer_client
.delete_subscription(subscription_name)
.await?;

for topic_name in [input_topic_name, output_topic_name] {
println!("Deleting topic {:?}", &topic_name);

publisher_client.delete_topic(topic_name).await?;
}

println!("Done");

Ok(())
}

fn random_id() -> i64 {
4 // chosen by fair dice roll.
// guaranteed to be random.
}
Loading