Skip to content

Commit a2acdaa

Browse files
committed
use blocktime and fix tests
1 parent 7c80bfb commit a2acdaa

File tree

10 files changed

+240
-83
lines changed

10 files changed

+240
-83
lines changed

chain/ethereum/src/chain.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,9 @@ pub struct FirehoseMapper {
11311131
impl BlockStreamMapper<Chain> for FirehoseMapper {
11321132
fn decode_block(
11331133
&self,
1134+
// We share the trait with substreams but for firehose the timestamp
1135+
// is in the block header so we don't need to use it here.
1136+
_timestamp: BlockTime,
11341137
output: Option<&[u8]>,
11351138
) -> Result<Option<BlockFinality>, BlockStreamError> {
11361139
let block = match output {
@@ -1199,12 +1202,19 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
11991202
// Check about adding basic information about the block in the firehose::Response or maybe
12001203
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
12011204
let block = codec::Block::decode(any_block.value.as_ref())?;
1205+
let timestamp = block
1206+
.header()
1207+
.timestamp
1208+
.map(|ts| BlockTime::since_epoch(ts.seconds, ts.nanos as u32))
1209+
.unwrap_or_default();
12021210

12031211
use firehose::ForkStep::*;
12041212
match step {
12051213
StepNew => {
12061214
// unwrap: Input cannot be None so output will be error or block.
1207-
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
1215+
let block = self
1216+
.decode_block(timestamp, Some(any_block.value.as_ref()))?
1217+
.unwrap();
12081218
let block_with_triggers = self.block_with_triggers(logger, block).await?;
12091219

12101220
Ok(BlockStreamEvent::ProcessBlock(

chain/near/src/chain.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use graph::blockchain::client::ChainClient;
33
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
44
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
55
use graph::blockchain::{
6-
BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind, NoopDecoderHook,
7-
NoopRuntimeAdapter, Trigger, TriggerFilterWrapper,
6+
BasicBlockchainBuilder, BlockIngestor, BlockTime, BlockchainBuilder, BlockchainKind,
7+
NoopDecoderHook, NoopRuntimeAdapter, Trigger, TriggerFilterWrapper,
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainName;
@@ -432,6 +432,7 @@ pub struct FirehoseMapper {
432432
impl BlockStreamMapper<Chain> for FirehoseMapper {
433433
fn decode_block(
434434
&self,
435+
_timestamp: BlockTime,
435436
output: Option<&[u8]>,
436437
) -> Result<Option<codec::Block>, BlockStreamError> {
437438
let block = match output {
@@ -528,7 +529,10 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
528529
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
529530
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
530531
// unwrap: Input cannot be None so output will be error or block.
531-
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
532+
let block = self
533+
// the block time is inside the block.
534+
.decode_block(BlockTime::MIN, Some(any_block.value.as_ref()))?
535+
.unwrap();
532536

533537
use ForkStep::*;
534538
match step {

chain/substreams/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub enum ParsedChanges {
4343
pub struct Block {
4444
pub hash: BlockHash,
4545
pub number: BlockNumber,
46+
pub timestamp: BlockTime,
4647
pub changes: EntityChanges,
4748
pub parsed_changes: Vec<ParsedChanges>,
4849
}
@@ -60,7 +61,7 @@ impl blockchain::Block for Block {
6061
}
6162

6263
fn timestamp(&self) -> BlockTime {
63-
BlockTime::NONE
64+
self.timestamp
6465
}
6566
}
6667

chain/substreams/src/mapper.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct WasmBlockMapper {
3232
impl BlockStreamMapper<Chain> for WasmBlockMapper {
3333
fn decode_block(
3434
&self,
35+
_timestamp: BlockTime,
3536
_output: Option<&[u8]>,
3637
) -> Result<Option<crate::Block>, BlockStreamError> {
3738
unreachable!("WasmBlockMapper does not do block decoding")
@@ -104,7 +105,11 @@ pub struct Mapper {
104105

105106
#[async_trait]
106107
impl BlockStreamMapper<Chain> for Mapper {
107-
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<Block>, BlockStreamError> {
108+
fn decode_block(
109+
&self,
110+
timestamp: BlockTime,
111+
output: Option<&[u8]>,
112+
) -> Result<Option<Block>, BlockStreamError> {
108113
let changes: EntityChanges = match output {
109114
Some(msg) => Message::decode(msg).map_err(SubstreamsError::DecodingError)?,
110115
None => EntityChanges {
@@ -125,6 +130,7 @@ impl BlockStreamMapper<Chain> for Mapper {
125130
number,
126131
changes,
127132
parsed_changes,
133+
timestamp,
128134
};
129135

130136
Ok(Some(block))
@@ -152,9 +158,13 @@ impl BlockStreamMapper<Chain> for Mapper {
152158
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
153159
let block_number: BlockNumber = clock.number.try_into().map_err(Error::from)?;
154160
let block_hash = clock.id.as_bytes().to_vec().into();
161+
let timestamp = clock
162+
.timestamp
163+
.map(|ts| BlockTime::since_epoch(ts.seconds, ts.nanos as u32))
164+
.unwrap_or_default();
155165

156166
let block = self
157-
.decode_block(Some(&block))?
167+
.decode_block(timestamp, Some(&block))?
158168
.ok_or_else(|| anyhow!("expected block to not be empty"))?;
159169

160170
let block = self.block_with_triggers(logger, block).await.map(|bt| {

graph/src/blockchain/block_stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,11 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
685685

686686
#[async_trait]
687687
pub trait BlockStreamMapper<C: Blockchain>: Send + Sync {
688-
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<C::Block>, BlockStreamError>;
688+
fn decode_block(
689+
&self,
690+
timestamp: BlockTime,
691+
output: Option<&[u8]>,
692+
) -> Result<Option<C::Block>, BlockStreamError>;
689693

690694
async fn block_with_triggers(
691695
&self,

graph/src/blockchain/types.rs

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::anyhow;
2+
use chrono::DateTime;
23
use diesel::deserialize::FromSql;
34
use diesel::pg::Pg;
45
use diesel::serialize::{Output, ToSql};
@@ -7,6 +8,7 @@ use diesel::sql_types::{Bytea, Nullable, Text};
78
use diesel_derives::{AsExpression, FromSqlRow};
89
use serde::{Deserialize, Deserializer};
910
use std::convert::TryFrom;
11+
use std::num::ParseIntError;
1012
use std::time::Duration;
1113
use std::{fmt, str::FromStr};
1214
use web3::types::{Block, H256, U256, U64};
@@ -16,9 +18,9 @@ use crate::components::store::BlockNumber;
1618
use crate::data::graphql::IntoValue;
1719
use crate::data::store::scalar::Timestamp;
1820
use crate::derive::CheapClone;
19-
use crate::object;
2021
use crate::prelude::{r, Value};
2122
use crate::util::stable_hash_glue::{impl_stable_hash, AsBytes};
23+
use crate::{bail, object};
2224

2325
/// A simple marker for byte arrays that are really block hashes
2426
#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression)]
@@ -477,10 +479,7 @@ impl TryFrom<(Option<H256>, Option<U64>, H256, U256)> for ExtendedBlockPtr {
477479
let block_number =
478480
i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?;
479481

480-
// Convert `U256` to `BlockTime`
481-
let secs =
482-
i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?;
483-
let block_time = BlockTime::since_epoch(secs, 0);
482+
let block_time = BlockTime::try_from(timestamp_u256)?;
484483

485484
Ok(ExtendedBlockPtr {
486485
hash: hash.into(),
@@ -497,16 +496,13 @@ impl TryFrom<(H256, i32, H256, U256)> for ExtendedBlockPtr {
497496
fn try_from(tuple: (H256, i32, H256, U256)) -> Result<Self, Self::Error> {
498497
let (hash, block_number, parent_hash, timestamp_u256) = tuple;
499498

500-
// Convert `U256` to `BlockTime`
501-
let secs =
502-
i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?;
503-
let block_time = BlockTime::since_epoch(secs, 0);
499+
let timestamp = BlockTime::try_from(timestamp_u256)?;
504500

505501
Ok(ExtendedBlockPtr {
506502
hash: hash.into(),
507503
number: block_number,
508504
parent_hash: parent_hash.into(),
509-
timestamp: block_time,
505+
timestamp,
510506
})
511507
}
512508
}
@@ -562,14 +558,67 @@ impl fmt::Display for ChainIdentifier {
562558
#[diesel(sql_type = Timestamptz)]
563559
pub struct BlockTime(Timestamp);
564560

561+
impl Default for BlockTime {
562+
fn default() -> Self {
563+
BlockTime::NONE
564+
}
565+
}
566+
567+
impl TryFrom<BlockTime> for U256 {
568+
type Error = anyhow::Error;
569+
570+
fn try_from(value: BlockTime) -> Result<Self, Self::Error> {
571+
if value.as_secs_since_epoch() < 0 {
572+
bail!("unable to convert block time into U256");
573+
}
574+
575+
Ok(U256::from(value.as_secs_since_epoch() as u64))
576+
}
577+
}
578+
579+
impl TryFrom<U256> for BlockTime {
580+
type Error = anyhow::Error;
581+
582+
fn try_from(value: U256) -> Result<Self, Self::Error> {
583+
i64::try_from(value)
584+
.map_err(|_| anyhow!("Timestamp out of range for i64"))
585+
.map(|ts| BlockTime::since_epoch(ts, 0))
586+
}
587+
}
588+
589+
impl TryFrom<Option<String>> for BlockTime {
590+
type Error = ParseIntError;
591+
592+
fn try_from(ts: Option<String>) -> Result<Self, Self::Error> {
593+
match ts {
594+
Some(str) => return BlockTime::from_str(&str),
595+
None => return Ok(BlockTime::NONE),
596+
};
597+
}
598+
}
599+
600+
impl FromStr for BlockTime {
601+
type Err = ParseIntError;
602+
603+
fn from_str(ts: &str) -> Result<Self, Self::Err> {
604+
let (radix, idx) = if ts.starts_with("0x") {
605+
(16, 2)
606+
} else {
607+
(10, 0)
608+
};
609+
610+
u64::from_str_radix(&ts[idx..], radix).map(|ts| BlockTime::since_epoch(ts as i64, 0))
611+
}
612+
}
613+
565614
impl BlockTime {
566-
/// A timestamp from a long long time ago used to indicate that we don't
567-
/// have a timestamp
568-
pub const NONE: Self = Self(Timestamp::NONE);
615+
// /// A timestamp from a long long time ago used to indicate that we don't
616+
// /// have a timestamp
617+
pub const NONE: Self = Self::MIN;
569618

570619
pub const MAX: Self = Self(Timestamp::MAX);
571620

572-
pub const MIN: Self = Self(Timestamp::MIN);
621+
pub const MIN: Self = Self(Timestamp(DateTime::from_timestamp_nanos(0)));
573622

574623
/// Construct a block time that is the given number of seconds and
575624
/// nanoseconds after the Unix epoch
@@ -586,7 +635,12 @@ impl BlockTime {
586635
/// hourly rollups in tests
587636
#[cfg(debug_assertions)]
588637
pub fn for_test(ptr: &BlockPtr) -> Self {
589-
Self::since_epoch(ptr.number as i64 * 45 * 60, 0)
638+
Self::for_test_number(&ptr.number)
639+
}
640+
641+
#[cfg(debug_assertions)]
642+
pub fn for_test_number(number: &BlockNumber) -> Self {
643+
Self::since_epoch(*number as i64 * 45 * 60, 0)
590644
}
591645

592646
pub fn as_secs_since_epoch(&self) -> i64 {

graph/src/data_source/offchain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl DataSource {
216216
data_source::MappingTrigger::Offchain(trigger.clone()),
217217
self.mapping.handler.clone(),
218218
BlockPtr::new(Default::default(), self.creation_block.unwrap_or(0)),
219-
BlockTime::NONE,
219+
BlockTime::MIN,
220220
))
221221
}
222222

0 commit comments

Comments
 (0)