Skip to content

Commit 28312fd

Browse files
authored
Merge pull request #30 from graphprotocol/tmigone/deployment-cache
feat: implement deployment cache to reduce IPFS request load
2 parents 61f6aa6 + 05bb802 commit 28312fd

File tree

2 files changed

+75
-14
lines changed

2 files changed

+75
-14
lines changed

availability-oracle/src/main.rs

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use manifest::{Abi, DataSource, Manifest, Mapping};
1818
use network_subgraph::*;
1919
use secp256k1::SecretKey;
2020
use std::sync::Arc;
21+
use std::time::SystemTime;
2122
use std::time::{Duration, Instant};
2223
use std::{fmt::Display, str::FromStr};
2324
use structopt::StructOpt;
@@ -148,6 +149,8 @@ struct Config {
148149
pub oracle_index: Option<u64>,
149150
}
150151

152+
const VALID_DEPLOYMENT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 24);
153+
151154
#[tokio::main]
152155
async fn main() -> Result<()> {
153156
common::main(run).await
@@ -159,6 +162,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
159162
let epoch_subgraph =
160163
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
161164
let contract: Box<dyn StateManager> = if config.dry_run {
165+
info!(
166+
logger,
167+
"Running in dry mode: no transactions will be submitted on chain!"
168+
);
162169
Box::new(StateManagerDryRun::new(logger.clone()))
163170
} else {
164171
let signing_key: &SecretKey = &config.signing_key.unwrap().parse()?;
@@ -181,6 +188,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
181188
if config.period > Duration::from_secs(0) {
182189
let mut interval = tokio::time::interval(config.period);
183190
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
191+
192+
// Valid deployments get checked only every VALID_DEPLOYMENT_CACHE_TTL seconds
193+
let mut valid_deployment_cache: Vec<(Cid, SystemTime)> = Vec::new();
194+
184195
loop {
185196
interval.tick().await;
186197

@@ -197,11 +208,16 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
197208
grace_period,
198209
epoch_subgraph.clone(),
199210
&config.supported_data_source_kinds,
211+
valid_deployment_cache.clone(),
200212
)
201213
.await
202214
{
203-
Ok(()) => {
215+
Ok(updated_deployment_cache) => {
204216
METRICS.reconcile_runs_ok.inc();
217+
valid_deployment_cache = updated_deployment_cache;
218+
info!(logger, "Deployment cache updated";
219+
"count" => valid_deployment_cache.len()
220+
);
205221
}
206222
Err(e) => {
207223
METRICS.reconcile_runs_err.inc();
@@ -221,7 +237,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
221237
ipfs.invalidate_cache();
222238
}
223239
}
224-
reconcile_deny_list(
240+
match reconcile_deny_list(
225241
&logger,
226242
&ipfs,
227243
&*contract,
@@ -230,8 +246,13 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
230246
grace_period,
231247
epoch_subgraph.clone(),
232248
&config.supported_data_source_kinds,
249+
Vec::new(),
233250
)
234251
.await
252+
{
253+
Ok(_) => return Ok(()),
254+
Err(e) => return Err(e),
255+
}
235256
}
236257

237258
// This function is used to create a state manager based on the configuration.
@@ -281,7 +302,8 @@ pub async fn reconcile_deny_list(
281302
grace_period: Duration,
282303
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
283304
supported_ds_kinds: &[String],
284-
) -> Result<(), Error> {
305+
valid_deployment_cache: Vec<(Cid, SystemTime)>,
306+
) -> Result<Vec<(Cid, SystemTime)>, Error> {
285307
let logger = logger.clone();
286308

287309
// Fetch supported networks
@@ -300,20 +322,35 @@ pub async fn reconcile_deny_list(
300322
);
301323

302324
// Check the availability status of all subgraphs, and gather which should flip the deny flag.
303-
let status_changes: Vec<([u8; 32], bool)> = subgraph
325+
let deployment_status: Vec<([u8; 32], bool, bool, SystemTime)> = subgraph
304326
.deployments_over_threshold(min_signal, grace_period)
305327
.map(|deployment| async {
306328
let deployment = deployment?;
307329
let id = bytes32_to_cid_v0(deployment.id);
308-
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
309-
Ok(()) => Valid::Yes,
310-
Err(CheckError::Invalid(e)) => Valid::No(e),
311-
Err(CheckError::Other(e)) => return Err(e),
330+
331+
// Valid subgraphs are only checked every VALID_DEPLOYMENT_CACHE_TTL seconds to reduce IPFS requests
332+
let cached = valid_deployment_cache
333+
.iter()
334+
.filter(|(_, last_validated)| {
335+
last_validated.elapsed().unwrap() < VALID_DEPLOYMENT_CACHE_TTL
336+
})
337+
.find(|(cid, _)| *cid == id);
338+
339+
if cached.is_some() {
340+
METRICS.valid_deployment_cache_hits.inc();
341+
return Ok((deployment, Valid::Yes, cached.unwrap().1));
342+
} else {
343+
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await
344+
{
345+
Ok(()) => Valid::Yes,
346+
Err(CheckError::Invalid(e)) => Valid::No(e),
347+
Err(CheckError::Other(e)) => return Err(e),
348+
};
349+
return Ok((deployment, validity, SystemTime::now()));
312350
};
313-
Result::<_, Error>::Ok((deployment, validity))
314351
})
315352
.buffered(100)
316-
.try_filter_map(move |(deployment, validity)| {
353+
.try_filter_map(move |(deployment, validity, last_validated)| {
317354
let logger = logger.clone();
318355
async move {
319356
info!(logger, "Check subgraph";
@@ -336,7 +373,7 @@ pub async fn reconcile_deny_list(
336373
);
337374
}
338375
};
339-
None
376+
Some((deployment.id, should_deny, false, last_validated))
340377
}
341378

342379
// The validity status changed, flip the deny flag.
@@ -347,15 +384,32 @@ pub async fn reconcile_deny_list(
347384
"status" => should_deny,
348385
"reason" => validity.to_string(),
349386
);
350-
Some((deployment.id, should_deny))
387+
Some((deployment.id, should_deny, true, last_validated))
351388
}
352389
})
353390
}
354391
})
355392
.try_collect()
356393
.await?;
357394

358-
state_manager.deny_many(status_changes).await
395+
// Flip on chain status for those deployments that changed
396+
let changed_deployments = deployment_status
397+
.iter()
398+
.filter(|(_, _, status_changed, _)| *status_changed)
399+
.map(|(cid, should_deny, _, _)| (*cid, *should_deny))
400+
.collect();
401+
match state_manager.deny_many(changed_deployments).await {
402+
Ok(_) => {}
403+
Err(e) => return Err(e),
404+
};
405+
406+
// Return updated deployment cache
407+
let updated_deployment_cache: Vec<(Cid, SystemTime)> = deployment_status
408+
.iter()
409+
.filter(|(_, should_deny, _, _)| !*should_deny)
410+
.map(|(cid, _, _, last_validated)| (bytes32_to_cid_v0(*cid), *last_validated))
411+
.collect();
412+
Ok(updated_deployment_cache)
359413
}
360414

361415
enum Valid {
@@ -537,6 +591,7 @@ struct Metrics {
537591
reconcile_runs_total: prometheus::IntCounter,
538592
reconcile_runs_ok: prometheus::IntCounter,
539593
reconcile_runs_err: prometheus::IntCounter,
594+
valid_deployment_cache_hits: prometheus::IntCounter,
540595
}
541596

542597
lazy_static! {
@@ -561,6 +616,11 @@ impl Metrics {
561616
"Total reconcile runs with errors"
562617
)
563618
.unwrap(),
619+
valid_deployment_cache_hits: prometheus::register_int_counter!(
620+
"valid_deployment_cache_hits",
621+
"Total valid deployment cache hits"
622+
)
623+
.unwrap(),
564624
}
565625
}
566626
}

availability-oracle/src/test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ mod tests {
6161
"file/ipfs".into(),
6262
"substreams".into(),
6363
],
64+
vec![],
6465
)
6566
.await
66-
.unwrap()
67+
.unwrap();
6768
}
6869

6970
struct MockSubgraph;

0 commit comments

Comments
 (0)