Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/applications.yml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ jobs:
exe:known-graphs \
exe:standalone-pruner \
exe:pact-diff \
exe:pact-replay \
test:chainweb-tests \
test:multi-node-network-tests \
test:remote-tests \
Expand Down Expand Up @@ -417,6 +418,7 @@ jobs:
cp $(cabal list-bin known-graphs) artifacts/chainweb
cp $(cabal list-bin multi-node-network-tests) artifacts/chainweb
cp $(cabal list-bin pact-diff) artifacts/chainweb
cp $(cabal list-bin pact-replay) artifacts/chainweb
cp $(cabal list-bin remote-tests) artifacts/chainweb
cp $(cabal list-bin standalone-pruner) artifacts/chainweb
cp README.md artifacts/chainweb
Expand Down
11 changes: 11 additions & 0 deletions chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ library
, Chainweb.VerifierPlugin.Hyperlane.Binary
, Chainweb.VerifierPlugin.Hyperlane.Message
, Chainweb.VerifierPlugin.Hyperlane.Message.After225
, Chainweb.VerifierPlugin.Hyperlane.Message.Before225
, Chainweb.VerifierPlugin.Hyperlane.Utils
, Chainweb.Version
, Chainweb.Version.Development
Expand Down Expand Up @@ -375,6 +376,16 @@ library
, Chainweb.Pact4.TransactionExec
, Chainweb.Pact4.Types
, Chainweb.Pact4.Validations
, Chainweb.Pact.Transactions.Mainnet0Transactions
, Chainweb.Pact.Transactions.Mainnet1Transactions
, Chainweb.Pact.Transactions.Mainnet2Transactions
, Chainweb.Pact.Transactions.Mainnet3Transactions
, Chainweb.Pact.Transactions.Mainnet4Transactions
, Chainweb.Pact.Transactions.Mainnet5Transactions
, Chainweb.Pact.Transactions.Mainnet6Transactions
, Chainweb.Pact.Transactions.Mainnet7Transactions
, Chainweb.Pact.Transactions.Mainnet8Transactions
, Chainweb.Pact.Transactions.Mainnet9Transactions

-- utils
, Utils.Logging
Expand Down
39 changes: 39 additions & 0 deletions cwtools/cwtools.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,45 @@ executable db-checksum
, text
, unordered-containers

-- Generate genesis headers.
executable pact-replay
import: warning-flags, debugging-flags
default-language: Haskell2010
ghc-options:
-threaded
-rtsopts
"-with-rtsopts=-N -H1G -A64M"
-Wno-x-partial -Wno-unrecognised-warning-flags
hs-source-dirs:
pact-replay
main-is:
PactReplay.hs
build-depends:
, chainweb
, chainweb:chainweb-test-utils

, aeson
, async
, base
, chainweb-storage
, constraints
, containers
, filepath
, lens
, loglevel
, optparse-applicative
, pact-json
, pact-tng:pact-request-api
, pact-tng
, resource-pool
, resourcet
, safe-exceptions
, streaming
, temporary
, unordered-containers
, text
, vector

-- Generate genesis headers.
executable ea
import: warning-flags, debugging-flags
Expand Down
188 changes: 188 additions & 0 deletions cwtools/pact-replay/PactReplay.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}

module Main(main) where

import Chainweb.BlockHeader
import Chainweb.BlockHeaderDB
import Chainweb.BlockHeaderDB.PruneForks qualified as PruneForks
import Chainweb.BlockHeight (BlockHeight (..))
import Chainweb.Core.Brief
import Chainweb.Cut (cutHeaders, unsafeMkCut)
import Chainweb.Cut.Create hiding (join)
import Chainweb.CutDB (cutHashesTable, readHighestCutHeaders)
import Chainweb.Logger
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.PactService qualified as PactService
import Chainweb.Pact.Payload.PayloadStore.RocksDB qualified as Pact.Payload.PayloadStore.RocksDB
import Chainweb.Pact.Types
import Chainweb.Parent
import Chainweb.PayloadProvider (blockHeaderToEvaluationCtx)
import Chainweb.PayloadProvider.Pact
import Chainweb.PayloadProvider.Pact.Genesis (genesisPayload)
import Chainweb.Storage.Table.RocksDB (modernDefaultOptions, withReadOnlyRocksDb, withRocksDb)
import Chainweb.Time
import Chainweb.TreeDB qualified as TreeDB
import Chainweb.Utils
import Chainweb.Version
import Chainweb.Version.Registry
import Chainweb.WebBlockHeaderDB
import Control.Concurrent(threadDelay)
import Control.Concurrent.Async (forConcurrently, forConcurrently_)
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import Data.Constraint
import Data.HashSet qualified as HS
import Data.HashMap.Strict qualified as HM
import Data.IORef
import Data.List qualified as List
import Data.Text qualified as T
import Data.Text.IO qualified as T
import GHC.Stack
import Options.Applicative
import Streaming qualified as S
import Streaming.Prelude qualified as S
import System.FilePath ((</>))
import System.LogLevel
import Text.Printf

main :: IO ()
main = join $
execParser $ info
(parser <**> helper)
(fullDesc
<> progDesc "Replay Pact blocks checking that we get the correct outputs"
<> header "pact-replay")

getRocksDbDir :: HasCallStack => FilePath -> FilePath
getRocksDbDir base = base </> "0" </> "rocksDb"

getPactDbDir :: HasCallStack => FilePath -> FilePath
getPactDbDir base = base </> "0" </> "sqlite"

isPactChain :: HasVersion => ChainId -> Bool
isPactChain cid = payloadProviderTypeForChain cid == PactProvider

parser :: Parser (IO ())
parser = do
version <- option (findKnownVersion =<< textReader) (long "chainweb-version" <> short 'v')
dbDir <- textOption (long "database-directory")
logLevel <- flag' Debug (long "verbose") <|> flag' Warn (long "quiet") <|> pure Info
maybeStart <- optional $ BlockHeight <$> textOption (long "start")
maybeEnd <- optional $ BlockHeight <$> textOption (long "end")
chains :: Dict HasVersion -> [ChainId] <-
fmap const (jsonOption (long "chains"))
<|> pure (\Dict -> filter isPactChain (HS.toList chainIds))
return $ withVersion version $ do
withRocksDb (getRocksDbDir dbDir) modernDefaultOptions $ \rdb -> do
let logger = genericLogger logLevel T.putStrLn
let cutTable = cutHashesTable rdb
let pdb = Pact.Payload.PayloadStore.RocksDB.newPayloadDb rdb
let wbhdb = mkWebBlockHeaderDb rdb (tabulateChains (mkBlockHeaderDb rdb))

initialCut <- unsafeMkCut <$> readHighestCutHeaders (logFunctionText logger) wbhdb cutTable
limitedCut <- maybe (return initialCut) (\end -> limitCut wbhdb end initialCut) maybeEnd

failureCount <- fmap sum $ forConcurrently (chains Dict `List.intersect` HM.keys (view cutHeaders limitedCut)) $ \cid -> runResourceT $ do
let chainLogger = addLabel ("chain", brief cid) logger
let config = defaultPactServiceConfig
PactPayloadProvider _ serviceEnv <- withPactPayloadProvider cid rdb Nothing chainLogger Nothing mempty pdb
(getPactDbDir dbDir)
config
(genesisPayload cid)

failureCountRef <- liftIO $ newIORef (0 :: Word)
speedHeightRef <- liftIO $ newIORef (0, 0)
bhdb <- getWebBlockHeaderDb wbhdb cid
_ <- withAsyncR (logProgress chainLogger cid speedHeightRef)

let upperEndBlock = limitedCut ^?! cutHeaders . ix cid
let upper = HS.singleton (TreeDB.UpperBound $ view blockHash upperEndBlock)
liftIO $ TreeDB.branchEntries bhdb Nothing Nothing Nothing Nothing mempty upper $ \blockStream -> do
blockStream
& S.takeWhile (\blk -> maybe True (\start -> view blockHeight blk >= start) maybeStart)
& withParent
& S.mapM (\(h, ph) ->
fmap (h,) $
try @_ @SomeException $
PactService.execReadOnlyReplay chainLogger serviceEnv
(view blockPayloadHash h <$ blockHeaderToEvaluationCtx ph))
& S.chunksOf 500
& mapsM_ (\blkChunk -> do
startTime <- getCurrentTimeIntegral

count S.:> (Just lastHdr S.:> x) <- blkChunk
& S.mapM (\case
(h, Left err) -> do
modifyIORef failureCountRef succ
logFunctionText chainLogger Error $ "Error block: " <> brief h <> ": " <> sshow err
return h
(h, Right (Just err)) -> do
modifyIORef failureCountRef succ
logFunctionText chainLogger Error $ "Invalid block " <> brief h <> ": " <> sshow err
return h
(h, Right Nothing) -> return h
)
& S.copy
& S.last
& S.length

endTime <- getCurrentTimeIntegral
let !(TimeSpan (timeTaken :: Micros)) = (endTime `diff` startTime)
let !speed :: Double = int count * 1_000_000 / int timeTaken

writeIORef speedHeightRef (speed, view blockHeight lastHdr)

return x
)

liftIO $ logFunctionText chainLogger Info $ "finished replaying chain " <> brief cid

liftIO $ readIORef failureCountRef
when (failureCount > 0) $
error $ sshow failureCount <> " blocks failed"
where
logProgress logger cid speedHeightRef = do
threadDelay 20_000_000
(speed, height) <- readIORef speedHeightRef
logFunctionText logger Info $
"Chain " <> brief cid <>
" speed " <> T.pack (printf "%.2f" speed) <> "/s"
<> " at " <> brief height <> " (desc.)"

-- requires that the input is descending
withParent :: Monad m => S.Stream (S.Of h) m r -> S.Stream (S.Of (h, Parent h)) m r
withParent = \strm -> do
S.lift (S.next strm) >>= \case
Left r -> return r
Right (bh, strm') -> go bh strm'
where
go bh strm = do
S.lift (S.next strm) >>= \case
Left r -> return r
Right (bh', strm') -> do
S.yield (bh, Parent bh')
go bh' strm'

mapsM_ :: Monad m => (forall x. f x -> m x) -> S.Stream f m r -> m r
mapsM_ f = go
where
go strm =
S.inspect strm >>= \case
Left r -> return r
Right fstrm -> do
strm' <- f fstrm
go strm'
9 changes: 1 addition & 8 deletions node/src/ChainwebNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,7 @@ node conf logger = do
rocksDbDir <- getRocksDbDir conf
pactDbDir <- getPactDbDir conf
dbBackupsDir <- getBackupsDir conf
withRocksDb' <-
if _configReadOnlyReplay cwConf
then
withReadOnlyRocksDb <$ logFunctionText logger Info "Opening RocksDB in read-only mode"
else
return withRocksDb
withRocksDb' rocksDbDir modernDefaultOptions $ \rocksDb -> do
withRocksDb rocksDbDir modernDefaultOptions $ \rocksDb -> do
logFunctionText logger Info $ "opened rocksdb in directory " <> sshow rocksDbDir
logFunctionText logger Debug $ "backup config: " <> sshow (_configBackup cwConf)
withChainweb cwConf logger rocksDb pactDbDir dbBackupsDir $ \case
Expand Down Expand Up @@ -370,7 +364,6 @@ withNodeLogger logCfg chainwebCfg v f = runManaged $ do
let !txFailureHandler =
if isJust (_cutInitialCutFile (_configCuts chainwebCfg))
|| isJust (_cutInitialBlockHeightLimit (_configCuts chainwebCfg))
|| _configReadOnlyReplay chainwebCfg
then [dropLogHandler (Proxy :: Proxy PactTxFailureLog)]
else []

Expand Down
3 changes: 2 additions & 1 deletion src/Chainweb/BlockHeaderDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ module Chainweb.BlockHeaderDB
, Configuration(..)
, BlockHeaderDb
, RankedBlockHeaderDb(..)
, initBlockHeaderDb
, closeBlockHeaderDb
, initBlockHeaderDb
, mkBlockHeaderDb
, withBlockHeaderDb

-- * Misc
Expand Down
15 changes: 10 additions & 5 deletions src/Chainweb/BlockHeaderDB/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module Chainweb.BlockHeaderDB.Internal
, BlockHeaderDb(..)
, RankedBlockHeaderDb(..)
, initBlockHeaderDb
, mkBlockHeaderDb
, closeBlockHeaderDb
, withBlockHeaderDb

Expand Down Expand Up @@ -274,26 +275,30 @@ dbAddChecked db e = unlessM (tableMember (_chainDbCas db) ek) dbAddCheckedIntern
--
initBlockHeaderDb :: HasVersion => Configuration -> IO BlockHeaderDb
initBlockHeaderDb config = do
let db = mkBlockHeaderDb (_configRocksDb config) (_chainId rootEntry)
dbAddChecked db rootEntry
return db
where
rootEntry = _configRoot config
cid = _chainId rootEntry
cidNs = T.encodeUtf8 (toText cid)

mkBlockHeaderDb :: HasVersion => RocksDb -> ChainId -> BlockHeaderDb
mkBlockHeaderDb rdb cid = db
where
headerTable = newTable
(_configRocksDb config)
rdb
(Codec (runPutS . encodeRankedBlockHeader) (runGetS decodeRankedBlockHeader))
(Codec (runPutS . encodeRankedBlockHash) (runGetS decodeRankedBlockHash))
["BlockHeader", cidNs, "header"]

rankTable = newTable
(_configRocksDb config)
rdb
(Codec (runPutS . encodeBlockHeight) (runGetS decodeBlockHeight))
(Codec (runPutS . encodeBlockHash) (runGetS decodeBlockHash))
["BlockHeader", cidNs, "rank"]

!db = BlockHeaderDb cid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the strictness annotation not needed anymore?

cidNs = T.encodeUtf8 (toText cid)
db = BlockHeaderDb
cid
implicitVersion
headerTable
rankTable
Expand Down
Loading
Loading