Skip to content

Commit

Permalink
Format doctests and add doctests for factory module (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor authored Mar 17, 2024
1 parent 02e8338 commit fa3897c
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 29 deletions.
4 changes: 4 additions & 0 deletions ractor/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# To format doc tests, you need to run cargo +nightly fmt
# and uncomment the following line
#
# format_code_in_doc_comments = true
116 changes: 116 additions & 0 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,122 @@
//! replace the worker with a new instance and continue processing jobs for that worker. The
//! factory also maintains the worker's message queue's so messages won't be lost which were in the
//! "worker"'s queue.
#![cfg_attr(
not(feature = "cluster"),
doc = "
## Example Factory
```rust
use ractor::concurrency::Duration;
use ractor::factory::{
Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerBuilder, WorkerMessage,
WorkerStartContext,
};
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
#[derive(Debug)]
enum ExampleMessage {
PrintValue(u64),
EchoValue(u64, RpcReplyPort<u64>),
}
/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[ractor::async_trait]
impl Actor for ExampleWorker {
type Msg = WorkerMessage<(), ExampleMessage>;
type State = WorkerStartContext<(), ExampleMessage>;
type Arguments = WorkerStartContext<(), ExampleMessage>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
// This is a message which all factory workers **must**
// adhere to. It is a background processing message from the
// factory which is used for (a) metrics and (b) detecting
// stuck workers, i.e. workers which aren't making progress
// processing their messages
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
// Actual business logic that we want to parallelize
tracing::trace!(\"Worker {} received {:?}\", state.wid, job.msg);
match job.msg {
ExampleMessage::PrintValue(value) => {
tracing::info!(\"Worker {} printing value {value}\", state.wid);
}
ExampleMessage::EchoValue(value, reply) => {
tracing::info!(\"Worker {} echoing value {value}\", state.wid);
let _ = reply.send(value);
}
}
// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
}
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker> for ExampleWorkerBuilder {
fn build(&self, _wid: usize) -> ExampleWorker {
ExampleWorker
}
}
#[tokio::main]
async fn main() {
let factory_def = Factory::<(), ExampleMessage, ExampleWorker> {
worker_count: 5,
routing_mode: RoutingMode::<()>::Queuer,
..Default::default()
};
let (factory, handle) = Actor::spawn(None, factory_def, Box::new(ExampleWorkerBuilder))
.await
.expect(\"Failed to startup factory\");
for i in 0..99 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::PrintValue(i),
options: JobOptions::default(),
}))
.expect(\"Failed to send to factory\");
}
let reply = factory
.call(
|prt| {
FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::EchoValue(123, prt),
options: JobOptions::default(),
})
},
None,
)
.await
.expect(\"Failed to send to factory\")
.expect(\"Failed to parse reply\");
assert_eq!(reply, 123);
factory.stop(None);
handle.await.unwrap();
}
```
"
)]

use std::collections::{HashMap, VecDeque};
use std::marker::PhantomData;
Expand Down
12 changes: 9 additions & 3 deletions ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! An example "ping-pong" actor might be the following
//!
//! ```rust
//! use ractor::{Actor, ActorRef, ActorProcessingErr};
//! use ractor::{Actor, ActorProcessingErr, ActorRef};
//!
//! /// [PingPong] is a basic actor that will print
//! /// ping..pong.. repeatedly until some exit
Expand Down Expand Up @@ -70,7 +70,11 @@
//! // Initially we need to create our state, and potentially
//! // start some internal processing (by posting a message for
//! // example)
//! async fn pre_start(&self, myself: ActorRef<Self::Msg>, _: ()) -> Result<Self::State, ActorProcessingErr> {
//! async fn pre_start(
//! &self,
//! myself: ActorRef<Self::Msg>,
//! _: (),
//! ) -> Result<Self::State, ActorProcessingErr> {
//! // startup the event processing
//! myself.send_message(Message::Ping).unwrap();
//! Ok(0u8)
Expand All @@ -96,7 +100,9 @@
//! }
//!
//! async fn run() {
//! let (_, actor_handle) = Actor::spawn(None, PingPong, ()).await.expect("Failed to start actor");
//! let (_, actor_handle) = Actor::spawn(None, PingPong, ())
//! .await
//! .expect("Failed to start actor");
//! actor_handle.await.expect("Actor failed to exit cleanly");
//! }
//! ```
Expand Down
16 changes: 11 additions & 5 deletions ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
//! ## Examples
//!
//! ```rust
//! use ractor::{Actor, ActorRef, ActorProcessingErr};
//! use ractor::pg;
//! use ractor::{Actor, ActorProcessingErr, ActorRef};
//!
//! struct ExampleActor;
//!
Expand All @@ -32,16 +32,22 @@
//! type State = ();
//! type Arguments = ();
//!
//! async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _args: Self::Arguments) -> Result<Self::State, ActorProcessingErr> {
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! _args: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! println!("Starting");
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor");
//! let group = "the_group".to_string();
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ())
//! .await
//! .expect("Failed to startup dummy actor");
//! let group = "the_group".to_string();
//!
//! // Join the actor to a group. This is also commonly done in `pre_start` or `post_start`
//! // of the actor itself without having to do it externally by some coordinator
Expand All @@ -55,7 +61,7 @@
//! // wait for actor exit
//! actor.stop(None);
//! handle.await.unwrap();
//!
//!
//! // The actor will automatically be removed from the group upon shutdown.
//! let members = pg::get_members(&group);
//! assert_eq!(members.len(), 0);
Expand Down
20 changes: 14 additions & 6 deletions ractor/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
//! **Full example**
//!
//! ```rust
//! use ractor::{Actor, ActorRef, ActorProcessingErr};
//! use ractor::registry;
//! use ractor::{Actor, ActorProcessingErr, ActorRef};
//!
//! struct ExampleActor;
//!
Expand All @@ -47,24 +47,32 @@
//! type State = ();
//! type Arguments = ();
//!
//! async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _args: Self::Arguments) -> Result<Self::State, ActorProcessingErr> {
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! _args: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! println!("Starting");
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), ExampleActor, ()).await.expect("Failed to startup dummy actor");
//!
//! let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), ExampleActor, ())
//! .await
//! .expect("Failed to startup dummy actor");
//!
//! // Retrieve the actor by name from the registry
//! let who: ActorRef<()> = registry::where_is("my_actor".to_string()).expect("Failed to find actor").into();
//! let who: ActorRef<()> = registry::where_is("my_actor".to_string())
//! .expect("Failed to find actor")
//! .into();
//! who.cast(()).expect("Failed to send message");
//!
//! // wait for actor exit
//! actor.stop(None);
//! handle.await.unwrap();
//!
//!
//! // Automatically removed from the registry upon shutdown
//! assert!(registry::where_is("my_actor".to_string()).is_none());
//! }
Expand Down
35 changes: 26 additions & 9 deletions ractor/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//! ## Examples
//!
//! ```rust
//! use ractor::{cast, call, call_t};
//! use ractor::concurrency::Duration;
//! use ractor::{Actor, ActorRef, ActorProcessingErr, RpcReplyPort};
//! use ractor::{call, call_t, cast};
//! use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
//!
//! struct ExampleActor;
//!
Expand All @@ -33,12 +33,21 @@
//! type State = ();
//! type Arguments = ();
//!
//! async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _args: Self::Arguments) -> Result<Self::State, ActorProcessingErr> {
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! _args: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! println!("Starting");
//! Ok(())
//! }
//!
//! async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, _state: &mut Self::State) -> Result<(), ActorProcessingErr> {
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! ExampleMessage::Cast => println!("Cast message"),
//! ExampleMessage::Call(reply) => {
Expand All @@ -52,17 +61,25 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor");
//!
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ())
//! .await
//! .expect("Failed to startup dummy actor");
//!
//! // send a 1-way message (equivalent patterns)
//! actor.cast(ExampleMessage::Cast).expect("Failed to send message");
//! actor
//! .cast(ExampleMessage::Cast)
//! .expect("Failed to send message");
//! cast!(actor, ExampleMessage::Cast).expect("Failed to send message");
//!
//! // Send a message to the actor, with an associated reply channel,
//! // and wait for the reply from the actor (optionally up to a timeout)
//! let _result = actor.call(ExampleMessage::Call, Some(Duration::from_millis(100))).await.expect("Failed to call actor");
//! let _result = actor
//! .call(ExampleMessage::Call, Some(Duration::from_millis(100)))
//! .await
//! .expect("Failed to call actor");
//! let _result = call!(actor, ExampleMessage::Call).expect("Failed to call actor");
//! let _result = call_t!(actor, ExampleMessage::Call, 100).expect("Failed to call actor with timeout");
//! let _result =
//! call_t!(actor, ExampleMessage::Call, 100).expect("Failed to call actor with timeout");
//!
//! // wait for actor exit
//! actor.stop(None);
Expand Down
23 changes: 17 additions & 6 deletions ractor/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//!
//! ```rust
//! use ractor::concurrency::Duration;
//! use ractor::{Actor, ActorRef, ActorProcessingErr};
//! use ractor::{Actor, ActorProcessingErr, ActorRef};
//!
//! struct ExampleActor;
//!
Expand All @@ -36,12 +36,21 @@
//! type State = ();
//! type Arguments = ();
//!
//! async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _args: Self::Arguments) -> Result<Self::State, ActorProcessingErr> {
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! _args: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! println!("Starting");
//! Ok(())
//! }
//!
//! async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, _state: &mut Self::State) -> Result<(), ActorProcessingErr> {
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! ExampleMessage::AfterDelay => println!("After delay"),
//! ExampleMessage::OnPeriod => println!("On period"),
Expand All @@ -52,11 +61,13 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ()).await.expect("Failed to startup dummy actor");
//!
//! let (actor, handle) = Actor::spawn(None, ExampleActor, ())
//! .await
//! .expect("Failed to startup dummy actor");
//!
//! // send the message after a 100ms delay
//! actor.send_after(Duration::from_millis(100), || ExampleMessage::AfterDelay);
//!
//!
//! // send this message every 10ms
//! actor.send_interval(Duration::from_millis(10), || ExampleMessage::OnPeriod);
//!
Expand Down

0 comments on commit fa3897c

Please sign in to comment.