Skip to content

Commit 94fb3f2

Browse files
authored
Split the block cache into block pointer cache and block data cache (#6037)
* split block cache * get timestamp from pointers table * update justfile * use blocktime and fix tests * code review updates * use psql information_schema to check for migration requirements
1 parent 456fc3a commit 94fb3f2

File tree

18 files changed

+554
-168
lines changed

18 files changed

+554
-168
lines changed

chain/ethereum/src/chain.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,9 @@ impl Chain {
400400
pub async fn block_number(
401401
&self,
402402
hash: &BlockHash,
403-
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError> {
404-
self.chain_store.block_number(hash).await
403+
) -> Result<Option<(String, BlockNumber, Option<BlockTime>, Option<BlockHash>)>, StoreError>
404+
{
405+
self.chain_store.block_pointer(hash).await
405406
}
406407

407408
// TODO: This is only used to build the block stream which could prolly
@@ -1130,6 +1131,9 @@ pub struct FirehoseMapper {
11301131
impl BlockStreamMapper<Chain> for FirehoseMapper {
11311132
fn decode_block(
11321133
&self,
1134+
// We share the trait with substreams but for firehose the timestamp
1135+
// is in the block header so we don't need to use it here.
1136+
_timestamp: BlockTime,
11331137
output: Option<&[u8]>,
11341138
) -> Result<Option<BlockFinality>, BlockStreamError> {
11351139
let block = match output {
@@ -1198,12 +1202,19 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
11981202
// Check about adding basic information about the block in the firehose::Response or maybe
11991203
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
12001204
let block = codec::Block::decode(any_block.value.as_ref())?;
1205+
let timestamp = block
1206+
.header()
1207+
.timestamp
1208+
.map(|ts| BlockTime::since_epoch(ts.seconds, ts.nanos as u32))
1209+
.unwrap_or_default();
12011210

12021211
use firehose::ForkStep::*;
12031212
match step {
12041213
StepNew => {
12051214
// unwrap: Input cannot be None so output will be error or block.
1206-
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
1215+
let block = self
1216+
.decode_block(timestamp, Some(any_block.value.as_ref()))?
1217+
.unwrap();
12071218
let block_with_triggers = self.block_with_triggers(logger, block).await?;
12081219

12091220
Ok(BlockStreamEvent::ProcessBlock(

chain/near/src/chain.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use graph::blockchain::client::ChainClient;
33
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
44
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
55
use graph::blockchain::{
6-
BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind, NoopDecoderHook,
7-
NoopRuntimeAdapter, Trigger, TriggerFilterWrapper,
6+
BasicBlockchainBuilder, BlockIngestor, BlockTime, BlockchainBuilder, BlockchainKind,
7+
NoopDecoderHook, NoopRuntimeAdapter, Trigger, TriggerFilterWrapper,
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainName;
@@ -432,6 +432,7 @@ pub struct FirehoseMapper {
432432
impl BlockStreamMapper<Chain> for FirehoseMapper {
433433
fn decode_block(
434434
&self,
435+
_timestamp: BlockTime,
435436
output: Option<&[u8]>,
436437
) -> Result<Option<codec::Block>, BlockStreamError> {
437438
let block = match output {
@@ -528,7 +529,10 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
528529
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
529530
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
530531
// unwrap: Input cannot be None so output will be error or block.
531-
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
532+
let block = self
533+
// the block time is inside the block.
534+
.decode_block(BlockTime::MIN, Some(any_block.value.as_ref()))?
535+
.unwrap();
532536

533537
use ForkStep::*;
534538
match step {

chain/substreams/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub enum ParsedChanges {
4343
pub struct Block {
4444
pub hash: BlockHash,
4545
pub number: BlockNumber,
46+
pub timestamp: BlockTime,
4647
pub changes: EntityChanges,
4748
pub parsed_changes: Vec<ParsedChanges>,
4849
}
@@ -60,7 +61,7 @@ impl blockchain::Block for Block {
6061
}
6162

6263
fn timestamp(&self) -> BlockTime {
63-
BlockTime::NONE
64+
self.timestamp
6465
}
6566
}
6667

chain/substreams/src/mapper.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct WasmBlockMapper {
3232
impl BlockStreamMapper<Chain> for WasmBlockMapper {
3333
fn decode_block(
3434
&self,
35+
_timestamp: BlockTime,
3536
_output: Option<&[u8]>,
3637
) -> Result<Option<crate::Block>, BlockStreamError> {
3738
unreachable!("WasmBlockMapper does not do block decoding")
@@ -104,7 +105,11 @@ pub struct Mapper {
104105

105106
#[async_trait]
106107
impl BlockStreamMapper<Chain> for Mapper {
107-
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<Block>, BlockStreamError> {
108+
fn decode_block(
109+
&self,
110+
timestamp: BlockTime,
111+
output: Option<&[u8]>,
112+
) -> Result<Option<Block>, BlockStreamError> {
108113
let changes: EntityChanges = match output {
109114
Some(msg) => Message::decode(msg).map_err(SubstreamsError::DecodingError)?,
110115
None => EntityChanges {
@@ -125,6 +130,7 @@ impl BlockStreamMapper<Chain> for Mapper {
125130
number,
126131
changes,
127132
parsed_changes,
133+
timestamp,
128134
};
129135

130136
Ok(Some(block))
@@ -152,9 +158,13 @@ impl BlockStreamMapper<Chain> for Mapper {
152158
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
153159
let block_number: BlockNumber = clock.number.try_into().map_err(Error::from)?;
154160
let block_hash = clock.id.as_bytes().to_vec().into();
161+
let timestamp = clock
162+
.timestamp
163+
.map(|ts| BlockTime::since_epoch(ts.seconds, ts.nanos as u32))
164+
.unwrap_or_default();
155165

156166
let block = self
157-
.decode_block(Some(&block))?
167+
.decode_block(timestamp, Some(&block))?
158168
.ok_or_else(|| anyhow!("expected block to not be empty"))?;
159169

160170
let block = self.block_with_triggers(logger, block).await.map(|bt| {

graph/src/blockchain/block_stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,11 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
685685

686686
#[async_trait]
687687
pub trait BlockStreamMapper<C: Blockchain>: Send + Sync {
688-
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<C::Block>, BlockStreamError>;
688+
fn decode_block(
689+
&self,
690+
timestamp: BlockTime,
691+
output: Option<&[u8]>,
692+
) -> Result<Option<C::Block>, BlockStreamError>;
689693

690694
async fn block_with_triggers(
691695
&self,

graph/src/blockchain/mock.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,11 @@ impl ChainStore for MockChainStore {
546546
fn confirm_block_hash(&self, _number: BlockNumber, _hash: &BlockHash) -> Result<usize, Error> {
547547
unimplemented!()
548548
}
549-
async fn block_number(
549+
async fn block_pointer(
550550
&self,
551551
_hash: &BlockHash,
552-
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError> {
552+
) -> Result<Option<(String, BlockNumber, Option<BlockTime>, Option<BlockHash>)>, StoreError>
553+
{
553554
unimplemented!()
554555
}
555556
async fn block_numbers(

graph/src/blockchain/types.rs

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::anyhow;
2+
use chrono::DateTime;
23
use diesel::deserialize::FromSql;
34
use diesel::pg::Pg;
45
use diesel::serialize::{Output, ToSql};
@@ -7,6 +8,7 @@ use diesel::sql_types::{Bytea, Nullable, Text};
78
use diesel_derives::{AsExpression, FromSqlRow};
89
use serde::{Deserialize, Deserializer};
910
use std::convert::TryFrom;
11+
use std::num::ParseIntError;
1012
use std::time::Duration;
1113
use std::{fmt, str::FromStr};
1214
use web3::types::{Block, H256, U256, U64};
@@ -16,9 +18,9 @@ use crate::components::store::BlockNumber;
1618
use crate::data::graphql::IntoValue;
1719
use crate::data::store::scalar::Timestamp;
1820
use crate::derive::CheapClone;
19-
use crate::object;
2021
use crate::prelude::{r, Value};
2122
use crate::util::stable_hash_glue::{impl_stable_hash, AsBytes};
23+
use crate::{bail, object};
2224

2325
/// A simple marker for byte arrays that are really block hashes
2426
#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression)]
@@ -477,10 +479,7 @@ impl TryFrom<(Option<H256>, Option<U64>, H256, U256)> for ExtendedBlockPtr {
477479
let block_number =
478480
i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?;
479481

480-
// Convert `U256` to `BlockTime`
481-
let secs =
482-
i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?;
483-
let block_time = BlockTime::since_epoch(secs, 0);
482+
let block_time = BlockTime::try_from(timestamp_u256)?;
484483

485484
Ok(ExtendedBlockPtr {
486485
hash: hash.into(),
@@ -497,16 +496,13 @@ impl TryFrom<(H256, i32, H256, U256)> for ExtendedBlockPtr {
497496
fn try_from(tuple: (H256, i32, H256, U256)) -> Result<Self, Self::Error> {
498497
let (hash, block_number, parent_hash, timestamp_u256) = tuple;
499498

500-
// Convert `U256` to `BlockTime`
501-
let secs =
502-
i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?;
503-
let block_time = BlockTime::since_epoch(secs, 0);
499+
let timestamp = BlockTime::try_from(timestamp_u256)?;
504500

505501
Ok(ExtendedBlockPtr {
506502
hash: hash.into(),
507503
number: block_number,
508504
parent_hash: parent_hash.into(),
509-
timestamp: block_time,
505+
timestamp,
510506
})
511507
}
512508
}
@@ -562,14 +558,63 @@ impl fmt::Display for ChainIdentifier {
562558
#[diesel(sql_type = Timestamptz)]
563559
pub struct BlockTime(Timestamp);
564560

561+
impl Default for BlockTime {
562+
fn default() -> Self {
563+
BlockTime::NONE
564+
}
565+
}
566+
567+
impl TryFrom<BlockTime> for U256 {
568+
type Error = anyhow::Error;
569+
570+
fn try_from(value: BlockTime) -> Result<Self, Self::Error> {
571+
if value.as_secs_since_epoch() < 0 {
572+
bail!("unable to convert block time into U256");
573+
}
574+
575+
Ok(U256::from(value.as_secs_since_epoch() as u64))
576+
}
577+
}
578+
579+
impl TryFrom<U256> for BlockTime {
580+
type Error = anyhow::Error;
581+
582+
fn try_from(value: U256) -> Result<Self, Self::Error> {
583+
i64::try_from(value)
584+
.map_err(|_| anyhow!("Timestamp out of range for i64"))
585+
.map(|ts| BlockTime::since_epoch(ts, 0))
586+
}
587+
}
588+
589+
impl TryFrom<Option<String>> for BlockTime {
590+
type Error = ParseIntError;
591+
592+
fn try_from(ts: Option<String>) -> Result<Self, Self::Error> {
593+
match ts {
594+
Some(str) => return BlockTime::from_hex_str(&str),
595+
None => return Ok(BlockTime::NONE),
596+
};
597+
}
598+
}
599+
565600
impl BlockTime {
566601
/// A timestamp from a long long time ago used to indicate that we don't
567602
/// have a timestamp
568-
pub const NONE: Self = Self(Timestamp::NONE);
603+
pub const NONE: Self = Self::MIN;
569604

570605
pub const MAX: Self = Self(Timestamp::MAX);
571606

572-
pub const MIN: Self = Self(Timestamp::MIN);
607+
pub const MIN: Self = Self(Timestamp(DateTime::from_timestamp_nanos(0)));
608+
609+
pub fn from_hex_str(ts: &str) -> Result<Self, ParseIntError> {
610+
let (radix, idx) = if ts.starts_with("0x") {
611+
(16, 2)
612+
} else {
613+
(10, 0)
614+
};
615+
616+
u64::from_str_radix(&ts[idx..], radix).map(|ts| BlockTime::since_epoch(ts as i64, 0))
617+
}
573618

574619
/// Construct a block time that is the given number of seconds and
575620
/// nanoseconds after the Unix epoch
@@ -586,7 +631,12 @@ impl BlockTime {
586631
/// hourly rollups in tests
587632
#[cfg(debug_assertions)]
588633
pub fn for_test(ptr: &BlockPtr) -> Self {
589-
Self::since_epoch(ptr.number as i64 * 45 * 60, 0)
634+
Self::for_test_number(&ptr.number)
635+
}
636+
637+
#[cfg(debug_assertions)]
638+
pub fn for_test_number(number: &BlockNumber) -> Self {
639+
Self::since_epoch(*number as i64 * 45 * 60, 0)
590640
}
591641

592642
pub fn as_secs_since_epoch(&self) -> i64 {

graph/src/components/store/traits.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -576,10 +576,10 @@ pub trait ChainStore: ChainHeadStore {
576576
/// Currently, the timestamp is only returned if it's present in the top level block. This format is
577577
/// depends on the chain and the implementation of Blockchain::Block for the specific chain.
578578
/// eg: {"block": { "timestamp": 123123123 } }
579-
async fn block_number(
579+
async fn block_pointer(
580580
&self,
581581
hash: &BlockHash,
582-
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError>;
582+
) -> Result<Option<(String, BlockNumber, Option<BlockTime>, Option<BlockHash>)>, StoreError>;
583583

584584
/// Do the same lookup as `block_number`, but in bulk
585585
async fn block_numbers(
@@ -665,10 +665,10 @@ pub trait QueryStore: Send + Sync {
665665
/// Returns the blocknumber, timestamp and the parentHash. Timestamp depends on the chain block type
666666
/// and can have multiple formats, it can also not be prevent. For now this is only available
667667
/// for EVM chains both firehose and rpc.
668-
async fn block_number_with_timestamp_and_parent_hash(
668+
async fn block_pointer(
669669
&self,
670670
block_hash: &BlockHash,
671-
) -> Result<Option<(BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError>;
671+
) -> Result<Option<(BlockNumber, Option<BlockTime>, Option<BlockHash>)>, StoreError>;
672672

673673
fn wait_stats(&self) -> PoolWaitStats;
674674

graphql/src/runner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,11 @@ where
111111
let latest_block = match store.block_ptr().await.ok().flatten() {
112112
Some(block) => Some(LatestBlockInfo {
113113
timestamp: store
114-
.block_number_with_timestamp_and_parent_hash(&block.hash)
114+
.block_pointer(&block.hash)
115115
.await
116116
.ok()
117117
.flatten()
118-
.and_then(|(_, t, _)| t),
118+
.and_then(|(_, t, _)| t.map(|ts| ts.as_secs_since_epoch() as u64)),
119119
hash: block.hash,
120120
number: block.number,
121121
}),

graphql/src/store/resolver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl StoreResolver {
186186
let (timestamp, parent_hash) = if lookup_needed(field) {
187187
match self
188188
.store
189-
.block_number_with_timestamp_and_parent_hash(&block_ptr.hash)
189+
.block_pointer(&block_ptr.hash)
190190
.await
191191
.map_err(Into::<QueryExecutionError>::into)?
192192
{
@@ -219,7 +219,7 @@ impl StoreResolver {
219219
.unwrap_or(r::Value::Null);
220220

221221
let timestamp = timestamp
222-
.map(|ts| r::Value::Int(ts as i64))
222+
.map(|ts| r::Value::Int(ts.as_secs_since_epoch()))
223223
.unwrap_or(r::Value::Null);
224224

225225
let parent_hash = parent_hash

justfile

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
DATABASE_TEST_VAR_NAME := "THEGRAPH_STORE_POSTGRES_DIESEL_URL"
2+
DATABASE_URL := "postgresql://graph-node:let-me-in@localhost:5432/graph-node"
3+
4+
5+
help:
6+
@just -l
7+
8+
local-deps-up *ARGS:
9+
docker compose -f docker/docker-compose.yml up ipfs postgres {{ ARGS }}
10+
11+
local-deps-down:
12+
docker compose -f docker/docker-compose.yml down
13+
14+
test-deps-up *ARGS:
15+
docker compose -f tests/docker-compose.yml up {{ ARGS }}
16+
17+
test-deps-down:
18+
docker compose -f tests/docker-compose.yml down
19+
20+
# Requires local-deps, see local-deps-up
21+
test *ARGS:
22+
just _run_in_bash cargo test --workspace --exclude graph-tests -- --nocapture {{ ARGS }}
23+
24+
runner-test *ARGS:
25+
just _run_in_bash cargo test -p graph-tests --test runner_tests -- --nocapture {{ ARGS }}
26+
27+
# Requires test-deps to be running, see test-deps-up
28+
it-test *ARGS:
29+
just _run_in_bash cargo test --test integration_tests -- --nocapture {{ ARGS }}
30+
31+
local-rm-db:
32+
rm -r docker/data/postgres
33+
34+
new-migration NAME:
35+
diesel migration generate {{ NAME }} --migration-dir store/postgres/migrations/
36+
37+
_run_in_bash *CMD:
38+
#!/usr/bin/env bash
39+
export {{ DATABASE_TEST_VAR_NAME }}={{ DATABASE_URL }}
40+
{{ CMD }}

0 commit comments

Comments
 (0)