From fa3897c4b389d16b922910d29d911be45f3df6ef Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Sat, 16 Mar 2024 21:51:56 -0400 Subject: [PATCH] Format doctests and add doctests for factory module (#216) --- ractor/rustfmt.toml | 4 ++ ractor/src/factory/mod.rs | 116 +++++++++++++++++++++++++++++++++++++ ractor/src/lib.rs | 12 +++- ractor/src/pg/mod.rs | 16 +++-- ractor/src/registry/mod.rs | 20 +++++-- ractor/src/rpc/mod.rs | 35 ++++++++--- ractor/src/time/mod.rs | 23 ++++++-- 7 files changed, 197 insertions(+), 29 deletions(-) create mode 100644 ractor/rustfmt.toml diff --git a/ractor/rustfmt.toml b/ractor/rustfmt.toml new file mode 100644 index 00000000..89b1c6cf --- /dev/null +++ b/ractor/rustfmt.toml @@ -0,0 +1,4 @@ +# To format doc tests, you need to run cargo +nightly fmt +# and uncomment the following line +# +# format_code_in_doc_comments = true \ No newline at end of file diff --git a/ractor/src/factory/mod.rs b/ractor/src/factory/mod.rs index 5a16a7aa..980437c9 100644 --- a/ractor/src/factory/mod.rs +++ b/ractor/src/factory/mod.rs @@ -36,6 +36,122 @@ //! replace the worker with a new instance and continue processing jobs for that worker. The //! factory also maintains the worker's message queue's so messages won't be lost which were in the //! "worker"'s queue. +#![cfg_attr( + not(feature = "cluster"), + doc = " +## Example Factory +```rust +use ractor::concurrency::Duration; +use ractor::factory::{ + Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerBuilder, WorkerMessage, + WorkerStartContext, +}; +use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +#[derive(Debug)] +enum ExampleMessage { + PrintValue(u64), + EchoValue(u64, RpcReplyPort), +} +/// The worker's specification for the factory. This defines +/// the business logic for each message that will be done in parallel. +struct ExampleWorker; +#[ractor::async_trait] +impl Actor for ExampleWorker { + type Msg = WorkerMessage<(), ExampleMessage>; + type State = WorkerStartContext<(), ExampleMessage>; + type Arguments = WorkerStartContext<(), ExampleMessage>; + async fn pre_start( + &self, + _myself: ActorRef, + startup_context: Self::Arguments, + ) -> Result { + Ok(startup_context) + } + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + WorkerMessage::FactoryPing(time) => { + // This is a message which all factory workers **must** + // adhere to. It is a background processing message from the + // factory which is used for (a) metrics and (b) detecting + // stuck workers, i.e. workers which aren't making progress + // processing their messages + state + .factory + .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?; + } + WorkerMessage::Dispatch(job) => { + // Actual business logic that we want to parallelize + tracing::trace!(\"Worker {} received {:?}\", state.wid, job.msg); + match job.msg { + ExampleMessage::PrintValue(value) => { + tracing::info!(\"Worker {} printing value {value}\", state.wid); + } + ExampleMessage::EchoValue(value, reply) => { + tracing::info!(\"Worker {} echoing value {value}\", state.wid); + let _ = reply.send(value); + } + } + // job finished, on success or err we report back to the factory + state + .factory + .cast(FactoryMessage::Finished(state.wid, job.key))?; + } + } + Ok(()) + } +} +/// Used by the factory to build new [ExampleWorker]s. +struct ExampleWorkerBuilder; +impl WorkerBuilder for ExampleWorkerBuilder { + fn build(&self, _wid: usize) -> ExampleWorker { + ExampleWorker + } +} +#[tokio::main] +async fn main() { + let factory_def = Factory::<(), ExampleMessage, ExampleWorker> { + worker_count: 5, + routing_mode: RoutingMode::<()>::Queuer, + ..Default::default() + }; + let (factory, handle) = Actor::spawn(None, factory_def, Box::new(ExampleWorkerBuilder)) + .await + .expect(\"Failed to startup factory\"); + for i in 0..99 { + factory + .cast(FactoryMessage::Dispatch(Job { + key: (), + msg: ExampleMessage::PrintValue(i), + options: JobOptions::default(), + })) + .expect(\"Failed to send to factory\"); + } + let reply = factory + .call( + |prt| { + FactoryMessage::Dispatch(Job { + key: (), + msg: ExampleMessage::EchoValue(123, prt), + options: JobOptions::default(), + }) + }, + None, + ) + .await + .expect(\"Failed to send to factory\") + .expect(\"Failed to parse reply\"); + assert_eq!(reply, 123); + factory.stop(None); + handle.await.unwrap(); +} +``` +" +)] use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 50663a84..15a2b5fa 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -23,7 +23,7 @@ //! An example "ping-pong" actor might be the following //! //! ```rust -//! use ractor::{Actor, ActorRef, ActorProcessingErr}; +//! use ractor::{Actor, ActorProcessingErr, ActorRef}; //! //! /// [PingPong] is a basic actor that will print //! /// ping..pong.. repeatedly until some exit @@ -70,7 +70,11 @@ //! // Initially we need to create our state, and potentially //! // start some internal processing (by posting a message for //! // example) -//! async fn pre_start(&self, myself: ActorRef, _: ()) -> Result { +//! async fn pre_start( +//! &self, +//! myself: ActorRef, +//! _: (), +//! ) -> Result { //! // startup the event processing //! myself.send_message(Message::Ping).unwrap(); //! Ok(0u8) @@ -96,7 +100,9 @@ //! } //! //! async fn run() { -//! let (_, actor_handle) = Actor::spawn(None, PingPong, ()).await.expect("Failed to start actor"); +//! let (_, actor_handle) = Actor::spawn(None, PingPong, ()) +//! .await +//! .expect("Failed to start actor"); //! actor_handle.await.expect("Actor failed to exit cleanly"); //! } //! ``` diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index b26db79f..f2629588 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -21,8 +21,8 @@ //! ## Examples //! //! ```rust -//! use ractor::{Actor, ActorRef, ActorProcessingErr}; //! use ractor::pg; +//! use ractor::{Actor, ActorProcessingErr, ActorRef}; //! //! struct ExampleActor; //! @@ -32,7 +32,11 @@ //! type State = (); //! type Arguments = (); //! -//! async fn pre_start(&self, _myself: ActorRef, _args: Self::Arguments) -> Result { +//! async fn pre_start( +//! &self, +//! _myself: ActorRef, +//! _args: Self::Arguments, +//! ) -> Result { //! println!("Starting"); //! Ok(()) //! } @@ -40,8 +44,10 @@ //! //! #[tokio::main] //! async fn main() { -//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor"); -//! let group = "the_group".to_string(); +//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()) +//! .await +//! .expect("Failed to startup dummy actor"); +//! let group = "the_group".to_string(); //! //! // Join the actor to a group. This is also commonly done in `pre_start` or `post_start` //! // of the actor itself without having to do it externally by some coordinator @@ -55,7 +61,7 @@ //! // wait for actor exit //! actor.stop(None); //! handle.await.unwrap(); -//! +//! //! // The actor will automatically be removed from the group upon shutdown. //! let members = pg::get_members(&group); //! assert_eq!(members.len(), 0); diff --git a/ractor/src/registry/mod.rs b/ractor/src/registry/mod.rs index 0bc33d1f..27ca4b42 100644 --- a/ractor/src/registry/mod.rs +++ b/ractor/src/registry/mod.rs @@ -36,8 +36,8 @@ //! **Full example** //! //! ```rust -//! use ractor::{Actor, ActorRef, ActorProcessingErr}; //! use ractor::registry; +//! use ractor::{Actor, ActorProcessingErr, ActorRef}; //! //! struct ExampleActor; //! @@ -47,7 +47,11 @@ //! type State = (); //! type Arguments = (); //! -//! async fn pre_start(&self, _myself: ActorRef, _args: Self::Arguments) -> Result { +//! async fn pre_start( +//! &self, +//! _myself: ActorRef, +//! _args: Self::Arguments, +//! ) -> Result { //! println!("Starting"); //! Ok(()) //! } @@ -55,16 +59,20 @@ //! //! #[tokio::main] //! async fn main() { -//! let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), ExampleActor, ()).await.expect("Failed to startup dummy actor"); -//! +//! let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), ExampleActor, ()) +//! .await +//! .expect("Failed to startup dummy actor"); +//! //! // Retrieve the actor by name from the registry -//! let who: ActorRef<()> = registry::where_is("my_actor".to_string()).expect("Failed to find actor").into(); +//! let who: ActorRef<()> = registry::where_is("my_actor".to_string()) +//! .expect("Failed to find actor") +//! .into(); //! who.cast(()).expect("Failed to send message"); //! //! // wait for actor exit //! actor.stop(None); //! handle.await.unwrap(); -//! +//! //! // Automatically removed from the registry upon shutdown //! assert!(registry::where_is("my_actor".to_string()).is_none()); //! } diff --git a/ractor/src/rpc/mod.rs b/ractor/src/rpc/mod.rs index b158f200..01fde829 100644 --- a/ractor/src/rpc/mod.rs +++ b/ractor/src/rpc/mod.rs @@ -13,9 +13,9 @@ //! ## Examples //! //! ```rust -//! use ractor::{cast, call, call_t}; //! use ractor::concurrency::Duration; -//! use ractor::{Actor, ActorRef, ActorProcessingErr, RpcReplyPort}; +//! use ractor::{call, call_t, cast}; +//! use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; //! //! struct ExampleActor; //! @@ -33,12 +33,21 @@ //! type State = (); //! type Arguments = (); //! -//! async fn pre_start(&self, _myself: ActorRef, _args: Self::Arguments) -> Result { +//! async fn pre_start( +//! &self, +//! _myself: ActorRef, +//! _args: Self::Arguments, +//! ) -> Result { //! println!("Starting"); //! Ok(()) //! } //! -//! async fn handle(&self, _myself: ActorRef, message: Self::Msg, _state: &mut Self::State) -> Result<(), ActorProcessingErr> { +//! async fn handle( +//! &self, +//! _myself: ActorRef, +//! message: Self::Msg, +//! _state: &mut Self::State, +//! ) -> Result<(), ActorProcessingErr> { //! match message { //! ExampleMessage::Cast => println!("Cast message"), //! ExampleMessage::Call(reply) => { @@ -52,17 +61,25 @@ //! //! #[tokio::main] //! async fn main() { -//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor"); -//! +//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()) +//! .await +//! .expect("Failed to startup dummy actor"); +//! //! // send a 1-way message (equivalent patterns) -//! actor.cast(ExampleMessage::Cast).expect("Failed to send message"); +//! actor +//! .cast(ExampleMessage::Cast) +//! .expect("Failed to send message"); //! cast!(actor, ExampleMessage::Cast).expect("Failed to send message"); //! //! // Send a message to the actor, with an associated reply channel, //! // and wait for the reply from the actor (optionally up to a timeout) -//! let _result = actor.call(ExampleMessage::Call, Some(Duration::from_millis(100))).await.expect("Failed to call actor"); +//! let _result = actor +//! .call(ExampleMessage::Call, Some(Duration::from_millis(100))) +//! .await +//! .expect("Failed to call actor"); //! let _result = call!(actor, ExampleMessage::Call).expect("Failed to call actor"); -//! let _result = call_t!(actor, ExampleMessage::Call, 100).expect("Failed to call actor with timeout"); +//! let _result = +//! call_t!(actor, ExampleMessage::Call, 100).expect("Failed to call actor with timeout"); //! //! // wait for actor exit //! actor.stop(None); diff --git a/ractor/src/time/mod.rs b/ractor/src/time/mod.rs index 0234d841..c0941899 100644 --- a/ractor/src/time/mod.rs +++ b/ractor/src/time/mod.rs @@ -18,7 +18,7 @@ //! //! ```rust //! use ractor::concurrency::Duration; -//! use ractor::{Actor, ActorRef, ActorProcessingErr}; +//! use ractor::{Actor, ActorProcessingErr, ActorRef}; //! //! struct ExampleActor; //! @@ -36,12 +36,21 @@ //! type State = (); //! type Arguments = (); //! -//! async fn pre_start(&self, _myself: ActorRef, _args: Self::Arguments) -> Result { +//! async fn pre_start( +//! &self, +//! _myself: ActorRef, +//! _args: Self::Arguments, +//! ) -> Result { //! println!("Starting"); //! Ok(()) //! } //! -//! async fn handle(&self, _myself: ActorRef, message: Self::Msg, _state: &mut Self::State) -> Result<(), ActorProcessingErr> { +//! async fn handle( +//! &self, +//! _myself: ActorRef, +//! message: Self::Msg, +//! _state: &mut Self::State, +//! ) -> Result<(), ActorProcessingErr> { //! match message { //! ExampleMessage::AfterDelay => println!("After delay"), //! ExampleMessage::OnPeriod => println!("On period"), @@ -52,11 +61,13 @@ //! //! #[tokio::main] //! async fn main() { -//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor"); -//! +//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()) +//! .await +//! .expect("Failed to startup dummy actor"); +//! //! // send the message after a 100ms delay //! actor.send_after(Duration::from_millis(100), || ExampleMessage::AfterDelay); -//! +//! //! // send this message every 10ms //! actor.send_interval(Duration::from_millis(10), || ExampleMessage::OnPeriod); //!