|
1 | | -use blueprint_sdk::Job; |
2 | | -use blueprint_sdk::Router; |
3 | | -use blueprint_sdk::contexts::tangle::TangleClientContext; |
4 | | -use blueprint_sdk::crypto::sp_core::SpSr25519; |
5 | | -use blueprint_sdk::crypto::tangle_pair_signer::TanglePairSigner; |
6 | | -use blueprint_sdk::keystore::backends::Backend; |
7 | | -use blueprint_sdk::runner::BlueprintRunner; |
8 | | -use blueprint_sdk::runner::config::BlueprintEnvironment; |
9 | | -use blueprint_sdk::runner::tangle::config::TangleConfig; |
10 | | -use blueprint_sdk::tangle::consumer::TangleConsumer; |
11 | | -use blueprint_sdk::tangle::filters::MatchesServiceId; |
12 | | -use blueprint_sdk::tangle::layers::TangleLayer; |
13 | | -use blueprint_sdk::tangle::producer::TangleProducer; |
14 | | -use pico_coprocessor_service_blueprint_lib::{MyContext, SAY_HELLO_JOB_ID, say_hello}; |
| 1 | +// pico-coprocessor-service-bin/src/main.rs |
| 2 | +use blueprint_sdk::{ |
| 3 | + Job, |
| 4 | + Router, // Ensure Job and Router are imported |
| 5 | + alloy::primitives::Address, // Import Address |
| 6 | + contexts::tangle::TangleClientContext, |
| 7 | + crypto::{sp_core::SpSr25519, tangle_pair_signer::TanglePairSigner}, |
| 8 | + keystore::backends::Backend, |
| 9 | + runner::{BlueprintRunner, config::BlueprintEnvironment, tangle::config::TangleConfig}, |
| 10 | + tangle::{ |
| 11 | + consumer::TangleConsumer, filters::MatchesServiceId, layers::TangleLayer, |
| 12 | + producer::TangleProducer, |
| 13 | + }, |
| 14 | +}; |
| 15 | +// Import new types and jobs from lib |
| 16 | +use pico_coprocessor_service_blueprint_lib::{ |
| 17 | + GENERATE_COPROCESSOR_PROOF_JOB_ID, |
| 18 | + GENERATE_PROOF_JOB_ID, |
| 19 | + ServiceContext, |
| 20 | + generate_coprocessor_proof, |
| 21 | + generate_proof, |
| 22 | + say_hello, // Jobs |
| 23 | +}; |
| 24 | +use std::{path::PathBuf, str::FromStr}; // For PathBuf and FromStr |
15 | 25 | use tower::filter::FilterLayer; |
16 | 26 | use tracing::error; |
17 | 27 | use tracing::level_filters::LevelFilter; |
| 28 | +use url::Url; // For parsing RPC URL |
18 | 29 |
|
19 | 30 | #[tokio::main] |
20 | | -async fn main() -> Result<(), blueprint_sdk::Error> { |
| 31 | +async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| 32 | + // Use Box<dyn Error> for broader error handling |
21 | 33 | setup_log(); |
| 34 | + tracing::info!("Starting Pico Coprocessor Service Blueprint Runner..."); |
22 | 35 |
|
23 | | - let env = BlueprintEnvironment::load()?; |
| 36 | + // --- Load Configuration --- |
| 37 | + let env = BlueprintEnvironment::load() |
| 38 | + .map_err(|e| format!("Failed to load blueprint environment: {}", e))?; |
| 39 | + |
| 40 | + // Tangle Signer Setup |
24 | 41 | let sr25519_signer = env.keystore().first_local::<SpSr25519>()?; |
25 | 42 | let sr25519_pair = env.keystore().get_secret::<SpSr25519>(&sr25519_signer)?; |
26 | | - let st25519_signer = TanglePairSigner::new(sr25519_pair.0); |
| 43 | + let tangle_signer = TanglePairSigner::new(sr25519_pair.0); |
| 44 | + tracing::info!("Tangle signer configured."); |
27 | 45 |
|
| 46 | + // Tangle Client Setup |
28 | 47 | let tangle_client = env.tangle_client().await?; |
29 | 48 | let tangle_producer = |
30 | 49 | TangleProducer::finalized_blocks(tangle_client.rpc_client.clone()).await?; |
31 | | - let tangle_consumer = TangleConsumer::new(tangle_client.rpc_client.clone(), st25519_signer); |
32 | | - |
33 | | - let tangle_config = TangleConfig::default(); |
34 | | - |
35 | | - let service_id = env.protocol_settings.tangle()?.service_id.unwrap(); |
36 | | - let result = BlueprintRunner::builder(tangle_config, env) |
37 | | - .router( |
38 | | - // A router |
39 | | - // |
40 | | - // Each "route" is a job ID and the job function. We can also support arbitrary `Service`s from `tower`, |
41 | | - // which may make it easier for people to port over existing services to a blueprint. |
42 | | - Router::new() |
43 | | - // The route defined here has a `TangleLayer`, which adds metadata to the |
44 | | - // produced `JobResult`s, making it visible to a `TangleConsumer`. |
45 | | - .route(SAY_HELLO_JOB_ID, say_hello.layer(TangleLayer)) |
46 | | - // Add the `FilterLayer` to filter out job calls that don't match the service ID |
47 | | - // |
48 | | - // This layer is global to the router, and is applied to every job call. |
49 | | - .layer(FilterLayer::new(MatchesServiceId(service_id))) |
50 | | - // We can add a context to the router, which will be passed to all job functions |
51 | | - // that have the `Context` extractor. |
52 | | - // |
53 | | - // A context can be used for global state between job calls, such as a database. |
54 | | - // |
55 | | - // It is important to note that the context is **cloned** for each job call, so |
56 | | - // the context must be cheaply cloneable. |
57 | | - .with_context(MyContext::new()), |
58 | | - ) |
59 | | - // Add potentially many producers |
60 | | - // |
61 | | - // A producer is simply a `Stream` that outputs `JobCall`s, which are passed down to the intended |
62 | | - // job functions. |
| 50 | + let tangle_consumer = TangleConsumer::new(tangle_client.rpc_client.clone(), tangle_signer); |
| 51 | + tracing::info!("Tangle producer and consumer configured."); |
| 52 | + |
| 53 | + let tangle_config = env.protocol_settings.tangle()?.clone(); // Use loaded config |
| 54 | + let service_id = tangle_config |
| 55 | + .service_id |
| 56 | + .ok_or("Tangle Service ID not configured")?; |
| 57 | + tracing::info!(%service_id, "Using Tangle Service ID"); |
| 58 | + |
| 59 | + // --- Service Specific Configuration --- |
| 60 | + // Get these from environment variables or a config file via BlueprintEnvironment extensions |
| 61 | + // Example using environment variables (add error handling) |
| 62 | + let eth_rpc_env = |
| 63 | + std::env::var("ETH_RPC_URL").map_err(|_| "ETH_RPC_URL environment variable not set")?; |
| 64 | + let eth_rpc_url = |
| 65 | + Url::parse(ð_rpc_env).map_err(|e| format!("Invalid ETH_RPC_URL: {}", e))?; |
| 66 | + |
| 67 | + let registry_addr_env = std::env::var("REGISTRY_CONTRACT_ADDRESS") |
| 68 | + .map_err(|_| "REGISTRY_CONTRACT_ADDRESS environment variable not set")?; |
| 69 | + let registry_contract_address = Address::from_str(®istry_addr_env) |
| 70 | + .map_err(|e| format!("Invalid REGISTRY_CONTRACT_ADDRESS: {}", e))?; |
| 71 | + |
| 72 | + let temp_dir_base_env = |
| 73 | + std::env::var("TEMP_DIR_BASE").unwrap_or_else(|_| "/tmp/pico-service".to_string()); |
| 74 | + let temp_dir_base = PathBuf::from(temp_dir_base_env); |
| 75 | + |
| 76 | + tracing::info!(rpc_url = %eth_rpc_url, registry = %registry_contract_address, temp_dir = ?temp_dir_base, "Service configuration loaded"); |
| 77 | + |
| 78 | + // --- Create Service Context --- |
| 79 | + let service_context = |
| 80 | + ServiceContext::new(eth_rpc_url, registry_contract_address, temp_dir_base) |
| 81 | + .map_err(|e| format!("Failed to create service context: {:?}", e))?; |
| 82 | + tracing::info!("Service context created."); |
| 83 | + |
| 84 | + // --- Build Router --- |
| 85 | + let router = Router::new() |
| 86 | + // Add routes for each job ID |
| 87 | + .route(GENERATE_PROOF_JOB_ID, generate_proof.layer(TangleLayer)) |
| 88 | + .route( |
| 89 | + GENERATE_COPROCESSOR_PROOF_JOB_ID, |
| 90 | + generate_coprocessor_proof.layer(TangleLayer), |
| 91 | + ) // Add new route |
| 92 | + // Global filter layer |
| 93 | + .layer(FilterLayer::new(MatchesServiceId(service_id))) |
| 94 | + // Add the shared context |
| 95 | + .with_context(service_context); |
| 96 | + tracing::info!("Router configured with {} jobs.", 3); // Update count |
| 97 | + |
| 98 | + // --- Build and Run Runner --- |
| 99 | + let runner_result = BlueprintRunner::builder(tangle_config, env) |
| 100 | + .router(router) |
63 | 101 | .producer(tangle_producer) |
64 | | - // Add potentially many consumers |
65 | | - // |
66 | | - // A consumer is simply a `Sink` that consumes `JobResult`s, which are the output of the job functions. |
67 | | - // Every result will be passed to every consumer. It is the responsibility of the consumer |
68 | | - // to determine whether or not to process a result. |
69 | 102 | .consumer(tangle_consumer) |
70 | | - // Custom shutdown handlers |
71 | | - // |
72 | | - // Now users can specify what to do when an error occurs and the runner is shutting down. |
73 | | - // That can be cleanup logic, finalizing database transactions, etc. |
74 | | - .with_shutdown_handler(async { println!("Shutting down!") }) |
| 103 | + .with_shutdown_handler(async { println!("Shutting down Pico Coprocessor Service!") }) |
75 | 104 | .run() |
76 | 105 | .await; |
77 | 106 |
|
78 | | - if let Err(e) = result { |
79 | | - error!("Runner failed! {e:?}"); |
| 107 | + if let Err(e) = runner_result { |
| 108 | + error!("Blueprint runner failed: {:?}", e); |
| 109 | + // Convert specific blueprint errors if needed, otherwise return the boxed error |
| 110 | + return Err(e.into()); |
80 | 111 | } |
81 | 112 |
|
| 113 | + tracing::info!("Blueprint runner finished successfully."); |
82 | 114 | Ok(()) |
83 | 115 | } |
84 | 116 |
|
85 | 117 | pub fn setup_log() { |
86 | 118 | use tracing_subscriber::util::SubscriberInitExt; |
| 119 | + let filter = tracing_subscriber::EnvFilter::builder() |
| 120 | + .with_default_directive(LevelFilter::INFO.into()) |
| 121 | + .from_env_lossy(); |
87 | 122 |
|
88 | | - let _ = tracing_subscriber::fmt::SubscriberBuilder::default() |
89 | | - .without_time() |
90 | | - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) |
91 | | - .with_env_filter( |
92 | | - tracing_subscriber::EnvFilter::builder() |
93 | | - .with_default_directive(LevelFilter::INFO.into()) |
94 | | - .from_env_lossy(), |
95 | | - ) |
96 | | - .finish() |
97 | | - .try_init(); |
| 123 | + let _ = tracing_subscriber::fmt() //.SubscriberBuilder::default() |
| 124 | + // .without_time() // Keep time for debugging |
| 125 | + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) // Show span duration |
| 126 | + .with_env_filter(filter) |
| 127 | + // .finish() // finish called by init |
| 128 | + .try_init(); // Use try_init to avoid panic if already initialized |
| 129 | + tracing::info!("Logging initialized."); |
98 | 130 | } |
0 commit comments