diff --git a/hydra-cluster/src/HydraNode.hs b/hydra-cluster/src/HydraNode.hs index 8456b2e2e1f..af901b9827a 100644 --- a/hydra-cluster/src/HydraNode.hs +++ b/hydra-cluster/src/HydraNode.hs @@ -398,6 +398,7 @@ prepareHydraNode chainConfig workDir hydraNodeId hydraSKey hydraVKeys allNodeIds , hydraSigningKey , hydraVerificationKeys , persistenceDir = stateDir + , persistenceRotateAfter = Nothing , chainConfig , ledgerConfig = CardanoLedgerConfig @@ -517,6 +518,7 @@ withHydraNode tracer chainConfig workDir hydraNodeId hydraSKey hydraVKeys allNod , hydraSigningKey , hydraVerificationKeys , persistenceDir = stateDir + , persistenceRotateAfter = Nothing , chainConfig , ledgerConfig = CardanoLedgerConfig diff --git a/hydra-cluster/test/Test/EndToEndSpec.hs b/hydra-cluster/test/Test/EndToEndSpec.hs index b37cdd7653f..7b038de42ef 100644 --- a/hydra-cluster/test/Test/EndToEndSpec.hs +++ b/hydra-cluster/test/Test/EndToEndSpec.hs @@ -20,9 +20,11 @@ import CardanoNode ( withCardanoNodeDevnet, ) import Control.Lens ((^..), (^?)) +import Control.Monad (foldM_) import Data.Aeson (Result (..), Value (Null, Object, String), fromJSON, object, (.=)) import Data.Aeson qualified as Aeson import Data.Aeson.Lens (AsJSON (_JSON), key, values, _JSON) +import Data.Aeson.Types (parseMaybe) import Data.ByteString qualified as BS import Data.List qualified as List import Data.Map qualified as Map @@ -88,6 +90,7 @@ import HydraNode ( getSnapshotUTxO, input, output, + prepareHydraNode, requestCommitTx, send, waitFor, @@ -96,6 +99,7 @@ import HydraNode ( waitMatch, withHydraCluster, withHydraNode, + withPreparedHydraNode, ) import System.Directory (removeDirectoryRecursive, removeFile) import System.FilePath (()) @@ -159,6 +163,57 @@ spec = around (showLogsOnFailure "EndToEndSpec") $ do waitMatch 10 node $ \v -> do guard $ v ^? key "tag" == Just "SnapshotConfirmed" + it "rotates persistence on start up" $ \tracer -> do + withClusterTempDir $ \tmpDir -> do + (aliceCardanoVk, aliceCardanoSk) <- keysFor Alice + initialUTxO <- generate $ genUTxOFor aliceCardanoVk + Aeson.encodeFile (tmpDir "utxo.json") initialUTxO + let offlineConfig = + Offline + OfflineChainConfig + { offlineHeadSeed = "test" + , initialUTxOFile = tmpDir "utxo.json" + , ledgerGenesisFile = Nothing + } + -- Start a hydra-node in offline mode and submit several self-txs + withHydraNode (contramap FromHydraNode tracer) offlineConfig tmpDir 1 aliceSk [] [] $ \node -> do + foldM_ + ( \utxo i -> do + let Just (aliceTxIn, aliceTxOut) = UTxO.find (isVkTxOut aliceCardanoVk) utxo + let Right selfTx = + mkSimpleTx + (aliceTxIn, aliceTxOut) + (mkVkAddress testNetworkId aliceCardanoVk, txOutValue aliceTxOut) + aliceCardanoSk + send node $ input "NewTx" ["transaction" .= selfTx] + waitMatch 10 node $ \v -> do + guard $ v ^? key "tag" == Just "SnapshotConfirmed" + guard $ v ^? key "snapshot" . key "number" == Just (toJSON (i :: Integer)) + v ^? key "snapshot" . key "utxo" >>= parseMaybe parseJSON + ) + initialUTxO + [1 .. (200 :: Integer)] + + -- Measure restart time + t0 <- getCurrentTime + diff1 <- withHydraNode (contramap FromHydraNode tracer) offlineConfig tmpDir 1 aliceSk [] [] $ \_ -> do + t1 <- getCurrentTime + let diff = diffUTCTime t1 t0 + pure diff + + -- Measure restart after rotation + options <- prepareHydraNode offlineConfig tmpDir 1 aliceSk [] [] id + let options' = options{persistenceRotateAfter = Just 10} + t1 <- getCurrentTime + diff2 <- withPreparedHydraNode (contramap FromHydraNode tracer) tmpDir 1 options' $ \_ -> do + t2 <- getCurrentTime + let diff = diffUTCTime t2 t1 + pure diff + + unless (diff2 < diff1 * 0.9) $ + failure $ + "Expected to start up 10% quicker than original " <> show diff1 <> ", but it took " <> show diff2 + it "supports multi-party networked heads" $ \tracer -> do withClusterTempDir $ \tmpDir -> do (aliceCardanoVk, aliceCardanoSk) <- keysFor Alice diff --git a/hydra-node/golden/RunOptions.json b/hydra-node/golden/RunOptions.json index 7769854745e..e1bc9c96f65 100644 --- a/hydra-node/golden/RunOptions.json +++ b/hydra-node/golden/RunOptions.json @@ -2,79 +2,59 @@ "samples": [ { "advertise": { - "hostname": "0.0.0.143", - "port": 16367 + "hostname": "0.0.18.3", + "port": 10552 }, "apiHost": { - "ipv4": "0.0.125.56", + "ipv4": "0.0.123.55", "tag": "IPv4" }, - "apiPort": 14924, + "apiPort": 32056, "chainConfig": { "initialUTxOFile": "c/c/c/a/a/c.json", "ledgerGenesisFile": "a/c/c/c.json", "offlineHeadSeed": "3c628286fcf0615fa5ffda3a7bae19ce", "tag": "OfflineChainConfig" }, - "hydraSigningKey": "c/c.sk", + "hydraSigningKey": "b/b.sk", "hydraVerificationKeys": [ - "b/b.vk", - "b.vk", - "c/b.vk", - "b/a.vk" + "c/b.vk" ], "ledgerConfig": { "cardanoLedgerProtocolParametersFile": "a/a.json" }, "listen": { - "hostname": "0.0.12.2", - "port": 6738 + "hostname": "0.0.48.206", + "port": 3297 }, - "monitoringPort": 28641, - "nodeId": "kjnedflwzpysgbpmeglqhiipbhg", + "monitoringPort": 21081, + "nodeId": "cgrkltadnqpcgshakdovdfcvi", "peers": [ { - "hostname": "0.0.0.5", - "port": 2 - }, - { - "hostname": "0.0.0.4", - "port": 1 - }, - { - "hostname": "0.0.0.2", - "port": 1 - }, - { - "hostname": "0.0.0.6", - "port": 2 - }, - { - "hostname": "0.0.0.4", - "port": 6 + "hostname": "0.0.0.0", + "port": 4 }, { - "hostname": "0.0.0.5", - "port": 4 + "hostname": "0.0.0.8", + "port": 0 } ], - "persistenceDir": "b/c/a/a", - "tlsCertPath": null, - "tlsKeyPath": "a/b/b.key", + "persistenceDir": "b/a/c/c/c", + "persistenceRotateAfter": 17, + "tlsCertPath": "b/b/a/b/b/a.pem", + "tlsKeyPath": null, "verbosity": { - "tag": "Quiet" + "contents": "HydraNode", + "tag": "Verbose" } }, { - "advertise": { - "hostname": "0.0.42.133", - "port": 12408 - }, + "advertise": null, "apiHost": { - "ipv4": "0.0.75.60", + "ipv4": "0.0.7.220", "tag": "IPv4" }, - "apiPort": 14910, + "apiPort": 19260, "chainConfig": { "cardanoSigningKey": "c.sk", "cardanoVerificationKeys": [ @@ -115,40 +95,52 @@ "startChainFrom": null, "tag": "DirectChainConfig" }, - "hydraSigningKey": "b/b/a.sk", + "hydraSigningKey": "a/b.sk", "hydraVerificationKeys": [ - "c/b/b.vk", - "b/b/a.vk", - "a/a/c.vk" + "b.vk", + "a/a.vk" ], "ledgerConfig": { "cardanoLedgerProtocolParametersFile": "b/c/b/b/a/b.json" }, "listen": { - "hostname": "0.0.62.172", - "port": 17754 + "hostname": "0.0.97.166", + "port": 32317 }, - "monitoringPort": 1256, - "nodeId": "onjlkjokxncuqjwmzorzjc", - "peers": [], - "persistenceDir": "c/c/c/c/a", - "tlsCertPath": null, + "monitoringPort": 2185, + "nodeId": "upjgsnwfaunetjfwkvwhb", + "peers": [ + { + "hostname": "0.0.0.7", + "port": 8 + }, + { + "hostname": "0.0.0.1", + "port": 5 + }, + { + "hostname": "0.0.0.2", + "port": 2 + } + ], + "persistenceDir": "b/c/a/a", + "persistenceRotateAfter": 0, + "tlsCertPath": "b/b/c/c/b.pem", "tlsKeyPath": null, "verbosity": { - "contents": "HydraNode", - "tag": "Verbose" + "tag": "Quiet" } }, { "advertise": { - "hostname": "0.0.121.25", - "port": 16440 + "hostname": "0.0.59.85", + "port": 3562 }, "apiHost": { - "ipv4": "0.0.70.171", + "ipv4": "0.0.98.227", "tag": "IPv4" }, - "apiPort": 4914, + "apiPort": 18091, "chainConfig": { "initialUTxOFile": "b.json", "ledgerGenesisFile": "a/c/c.json", @@ -157,36 +149,51 @@ }, "hydraSigningKey": "a.sk", "hydraVerificationKeys": [ - "b/b.vk", - "b/b.vk" + "b.vk", + "a/b/b.vk", + "b.vk", + "a/a/c.vk", + "a/b/c.vk", + "b/a/c.vk" ], "ledgerConfig": { "cardanoLedgerProtocolParametersFile": "a/a/c/c/a.json" }, "listen": { - "hostname": "0.0.73.68", - "port": 2102 + "hostname": "0.0.24.63", + "port": 4654 }, - "monitoringPort": null, - "nodeId": "crvsxxqfwiacm", - "peers": [], - "persistenceDir": "a/c/b/a/c/c", - "tlsCertPath": "c/c.pem", - "tlsKeyPath": "a/c/c/a.key", + "monitoringPort": 11367, + "nodeId": "vvfntzhppwcsbrdznrhjfbugiyzw", + "peers": [ + { + "hostname": "0.0.0.3", + "port": 8 + }, + { + "hostname": "0.0.0.3", + "port": 8 + }, + { + "hostname": "0.0.0.0", + "port": 5 + } + ], + "persistenceDir": "b/a/c", + "persistenceRotateAfter": 12, + "tlsCertPath": null, + "tlsKeyPath": "c/c.key", "verbosity": { "tag": "Quiet" } }, { - "advertise": { - "hostname": "0.0.66.12", - "port": 3079 - }, + "advertise": null, "apiHost": { - "ipv4": "0.0.43.74", + "ipv4": "0.0.74.85", "tag": "IPv4" }, - "apiPort": 11925, + "apiPort": 11082, "chainConfig": { "initialUTxOFile": "a/b/a.json", "ledgerGenesisFile": null, @@ -199,40 +206,29 @@ "cardanoLedgerProtocolParametersFile": "b/c/b/b/b/b.json" }, "listen": { - "hostname": "0.0.62.18", - "port": 26337 + "hostname": "0.0.11.38", + "port": 6135 }, - "monitoringPort": null, - "nodeId": "uyom", + "monitoringPort": 29242, + "nodeId": "bv", "peers": [ { - "hostname": "0.0.0.1", - "port": 0 - }, - { - "hostname": "0.0.0.5", - "port": 5 - }, - { - "hostname": "0.0.0.4", + "hostname": "0.0.0.2", "port": 7 }, { - "hostname": "0.0.0.5", - "port": 2 + "hostname": "0.0.0.3", + "port": 7 }, { "hostname": "0.0.0.0", "port": 5 - }, - { - "hostname": "0.0.0.1", - "port": 6 } ], - "persistenceDir": "b/a/b/a/a/b", + "persistenceDir": "c", + "persistenceRotateAfter": 2, "tlsCertPath": null, - "tlsKeyPath": "b/b/b.key", + "tlsKeyPath": null, "verbosity": { "contents": "HydraNode", "tag": "Verbose" @@ -240,47 +236,56 @@ }, { "advertise": { - "hostname": "0.0.114.198", - "port": 17907 + "hostname": "0.0.120.68", + "port": 8566 }, "apiHost": { - "ipv4": "0.0.23.86", + "ipv4": "0.0.56.52", "tag": "IPv4" }, - "apiPort": 22603, + "apiPort": 5974, "chainConfig": { "initialUTxOFile": "a/a/a/c/b.json", "ledgerGenesisFile": "c/a/b/b.json", "offlineHeadSeed": "0d53a78b05b4521daf80cecb4b7ceedd", "tag": "OfflineChainConfig" }, - "hydraSigningKey": "a/b/a/b/c.sk", - "hydraVerificationKeys": [], + "hydraSigningKey": "b/b/b/a.sk", + "hydraVerificationKeys": [ + "c/c/a.vk", + "a.vk", + "b/c/c.vk", + "a/b.vk" + ], "ledgerConfig": { "cardanoLedgerProtocolParametersFile": "b/c/b/a/b/c.json" }, "listen": { - "hostname": "0.0.42.39", - "port": 6767 + "hostname": "0.0.13.234", + "port": 23840 }, - "monitoringPort": 9249, - "nodeId": "unyaibyymqkxmdszwrweduhbja", + "monitoringPort": null, + "nodeId": "qsruzemcalmtersm", "peers": [ { - "hostname": "0.0.0.3", - "port": 1 + "hostname": "0.0.0.6", + "port": 5 }, { - "hostname": "0.0.0.8", - "port": 0 + "hostname": "0.0.0.6", + "port": 2 + }, + { + "hostname": "0.0.0.0", + "port": 7 } ], - "persistenceDir": "a/b/c/b/c/a", - "tlsCertPath": "b/c/c/a.pem", - "tlsKeyPath": null, + "persistenceDir": "b", + "persistenceRotateAfter": 29, + "tlsCertPath": "c/c.pem", + "tlsKeyPath": "b/c/c/a.key", "verbosity": { - "contents": "HydraNode", - "tag": "Verbose" + "tag": "Quiet" } } ], diff --git a/hydra-node/golden/StateChanged/Checkpoint.json b/hydra-node/golden/StateChanged/Checkpoint.json new file mode 100644 index 00000000000..6738143e7ee --- /dev/null +++ b/hydra-node/golden/StateChanged/Checkpoint.json @@ -0,0 +1,51 @@ +{ + "samples": [ + { + "state": { + "contents": { + "chainState": { + "recordedAt": null, + "spendableUTxO": { + "0001000100010100000001000001000100000101010000010001010001010100#80": { + "address": "addr1qx8huutcd5g0u2gu0ljtprgs7n6v0kjcl3c4rlnewxcm5z7sfz8unq97n9jvp0cts5fl6ljz7kpw0tctq0zxtd00938s2awnhc", + "datum": null, + "datumhash": "7db78ff4217211a2917576fdb68fe0a477237a597cbaac02357089f1b7c12e53", + "inlineDatum": null, + "inlineDatumRaw": null, + "referenceScript": { + "script": { + "cborHex": "820501", + "description": "", + "type": "SimpleScript" + }, + "scriptLanguage": "SimpleScriptLanguage" + }, + "value": { + "c8bc8235919ee180bfb554c23272f2039fe8099d10d2b411ca037685": { + "d745fa26448afb12954472e78cda07a50f1476": 7021196238619351361 + }, + "lovelace": 2948773106183291363 + } + } + } + }, + "committed": {}, + "headId": "00010001000100000101000001000101", + "headSeed": "01010001010000010001000101000101", + "parameters": { + "contestationPeriod": 31536000, + "parties": [ + { + "vkey": "db49d252804cba73a6bd2c98682122edf30013a5f52389c50a383c00946c38dc" + } + ] + }, + "pendingCommits": [] + }, + "tag": "Initial" + }, + "tag": "Checkpoint" + } + ], + "seed": -1287578243 +} \ No newline at end of file diff --git a/hydra-node/hydra-node.cabal b/hydra-node/hydra-node.cabal index 4ffe338c544..0ce92056c29 100644 --- a/hydra-node/hydra-node.cabal +++ b/hydra-node/hydra-node.cabal @@ -67,6 +67,7 @@ library Hydra.Chain.ScriptRegistry Hydra.Events Hydra.Events.FileBased + Hydra.Events.Rotation Hydra.HeadLogic Hydra.HeadLogic.Error Hydra.HeadLogic.Input @@ -273,6 +274,7 @@ test-suite tests Hydra.ContestationPeriodSpec Hydra.CryptoSpec Hydra.Events.FileBasedSpec + Hydra.Events.RotationSpec Hydra.HeadLogicSnapshotSpec Hydra.HeadLogicSpec Hydra.JSONSchemaSpec diff --git a/hydra-node/json-schemas/logs.yaml b/hydra-node/json-schemas/logs.yaml index 87e55d65727..4e26c2d8bf8 100644 --- a/hydra-node/json-schemas/logs.yaml +++ b/hydra-node/json-schemas/logs.yaml @@ -1660,6 +1660,18 @@ definitions: $ref: "api.yaml#/components/schemas/HeadId" snapshotNumber: $ref: "api.yaml#/components/schemas/SnapshotNumber" + - title: "Checkpoint" + additionalProperties: false + required: + - tag + - state + properties: + tag: + type: string + enum: ["Checkpoint"] + state: + $ref: "logs.yaml#/definitions/HeadState" + IdleState: type: object additionalProperties: false @@ -2396,6 +2408,7 @@ definitions: - hydraSigningKey - hydraVerificationKeys - persistenceDir + - persistenceRotateAfter - chainConfig - ledgerConfig properties: @@ -2443,6 +2456,10 @@ definitions: type: string persistenceDir: type: string + persistenceRotateAfter: + oneOf: + - type: "null" + - type: integer chainConfig: $ref: "logs.yaml#/definitions/ChainConfig" ledgerConfig: diff --git a/hydra-node/src/Hydra/API/Server.hs b/hydra-node/src/Hydra/API/Server.hs index 866dc569e51..50ca7f4d78b 100644 --- a/hydra-node/src/Hydra/API/Server.hs +++ b/hydra-node/src/Hydra/API/Server.hs @@ -162,6 +162,7 @@ withAPIServer config env party eventSource tracer chain pparams serverOutputFilt Nothing -> pure () Just timedOutput -> do atomically $ writeTChan responseChannel (Left timedOutput) + , rotate = const . const $ pure () } , Server{sendMessage = atomically . writeTChan responseChannel . Right} ) @@ -255,6 +256,7 @@ mkTimedServerOutputFromStateEvent event = StateChanged.ChainRolledBack{} -> Nothing StateChanged.TickObserved{} -> Nothing StateChanged.LocalStateCleared{..} -> Just SnapshotSideLoaded{..} + StateChanged.Checkpoint{} -> Nothing -- diff --git a/hydra-node/src/Hydra/Events.hs b/hydra-node/src/Hydra/Events.hs index c773211192b..f6959d910f4 100644 --- a/hydra-node/src/Hydra/Events.hs +++ b/hydra-node/src/Hydra/Events.hs @@ -34,9 +34,13 @@ newtype EventSource e m = EventSource getEvents :: (HasEventId e, MonadUnliftIO m) => EventSource e m -> m [e] getEvents EventSource{sourceEvents} = runResourceT $ sourceToList sourceEvents -newtype EventSink e m = EventSink +type LogId = Natural + +data EventSink e m = EventSink { putEvent :: HasEventId e => e -> m () -- ^ Send a single event to the event sink. + , rotate :: LogId -> e -> m () + -- ^ Rotate existing events into a given log id and start a new log from given e. } -- | Put a list of events to a list of event sinks in a round-robin fashion. diff --git a/hydra-node/src/Hydra/Events/FileBased.hs b/hydra-node/src/Hydra/Events/FileBased.hs index 6b5a773baa0..f64a066a3eb 100644 --- a/hydra-node/src/Hydra/Events/FileBased.hs +++ b/hydra-node/src/Hydra/Events/FileBased.hs @@ -6,17 +6,20 @@ module Hydra.Events.FileBased where import Hydra.Prelude import Conduit (mapMC, (.|)) -import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar) +import Control.Concurrent.Class.MonadSTM (newTVarIO, readTVarIO, writeTVar) import Hydra.Events (EventSink (..), EventSource (..), HasEventId (..)) +import Hydra.Events.Rotation (EventStore) import Hydra.Persistence (PersistenceIncremental (..)) --- | A basic file based event source and sink defined using an +-- | A basic file based event source and sink defined using a rotated -- 'PersistenceIncremental' handle. -eventPairFromPersistenceIncremental :: - (ToJSON e, FromJSON e, HasEventId e, MonadSTM m) => - PersistenceIncremental e m -> - m (EventSource e m, EventSink e m) -eventPairFromPersistenceIncremental PersistenceIncremental{append, source} = do +mkFileBasedEventStore :: + (ToJSON e, FromJSON e, HasEventId e) => + FilePath -> + (FilePath -> IO (PersistenceIncremental e IO)) -> + IO (EventStore e IO) +mkFileBasedEventStore fp mkPersistenceIncremental = do + persistenceV <- newTVarIO =<< mkPersistenceIncremental fp eventIdV <- newTVarIO Nothing let getLastSeenEventId = readTVar eventIdV @@ -25,8 +28,9 @@ eventPairFromPersistenceIncremental PersistenceIncremental{append, source} = do writeTVar eventIdV (Just $ getEventId evt) -- Keep track of the last seen event id when loading - sourceEvents = - source + sourceEvents = do + persistence <- liftIO (readTVarIO persistenceV) + source persistence .| mapMC ( \event -> lift . atomically $ do setLastSeenEventId event @@ -42,7 +46,14 @@ eventPairFromPersistenceIncremental PersistenceIncremental{append, source} = do | otherwise -> pure () store e = do - append e + persistence <- readTVarIO persistenceV + append persistence e atomically $ setLastSeenEventId e - pure (EventSource{sourceEvents}, EventSink{putEvent}) + rotate nextLogId checkpointEvt = do + let fp' = fp <> "-" <> show nextLogId + persistence' <- mkPersistenceIncremental fp' + append persistence' checkpointEvt + atomically $ writeTVar persistenceV persistence' + + pure (EventSource{sourceEvents}, EventSink{putEvent, rotate}) diff --git a/hydra-node/src/Hydra/Events/Rotation.hs b/hydra-node/src/Hydra/Events/Rotation.hs new file mode 100644 index 00000000000..41161e76618 --- /dev/null +++ b/hydra-node/src/Hydra/Events/Rotation.hs @@ -0,0 +1,104 @@ +module Hydra.Events.Rotation where + +import Hydra.Prelude + +import Conduit (MonadUnliftIO) +import Control.Concurrent.Class.MonadSTM (newTVarIO, readTVarIO, writeTVar) +import Hydra.Chain.ChainState (ChainStateType, IsChainState) +import Hydra.Events (EventSink (..), EventSource (..), HasEventId, LogId, StateEvent (..), getEvents) +import Hydra.HeadLogic (StateChanged (Checkpoint), aggregate) +import Hydra.HeadLogic.State (HeadState (..), IdleState (..)) + +newtype RotationConfig = RotateAfter Natural + +-- | An EventSource and EventSink combined +type EventStore e m = (EventSource e m, EventSink e m) + +type Checkpointer e = [e] -> e + +-- | Creates an event store that rotates according to given config and 'Checkpointer'. +mkRotatedEventStore :: + (HasEventId e, MonadSTM m, MonadUnliftIO m) => + RotationConfig -> + Checkpointer e -> + LogId -> + EventStore e m -> + m (EventStore e m) +mkRotatedEventStore config checkpointer logId eventStore = do + logIdV <- newTVarIO logId + -- Rules for any event store: + -- - sourceEvents will be called in the beginning of the application and whenever the api server wans to load history + -- -> might be called multiple times!! + -- - putEvent will be called on application start with all events returned by sourceEvents and during processing + currentEvents <- getEvents eventSource + let currentNumberOfEvents = toInteger $ length currentEvents + numberOfEventsV <- newTVarIO currentNumberOfEvents + -- XXX: check rotation on startup + when (currentNumberOfEvents >= toInteger rotateAfterX) $ do + rotateEventLog logIdV numberOfEventsV + pure + ( EventSource + { sourceEvents = rotatedSourceEvents + } + , EventSink + { putEvent = rotatedPutEvent logIdV numberOfEventsV + , -- NOTE: Don't allow rotation on-demand + rotate = const . const $ pure () + } + ) + where + RotateAfter rotateAfterX = config + -- TODO: if this turns out to be equal to sourceEvents, then the whole algorithm can just work on each 'EventSink' + rotatedSourceEvents = sourceEvents eventSource + + rotatedPutEvent logIdV numberOfEventsV event = do + putEvent event + -- XXX: bump numberOfEvents + numberOfEvents' <- atomically $ do + numberOfEvents <- readTVar numberOfEventsV + let numberOfEvents' = numberOfEvents + 1 + writeTVar numberOfEventsV numberOfEvents' + pure numberOfEvents' + -- XXX: check rotation + when (numberOfEvents' >= toInteger rotateAfterX) $ do + rotateEventLog logIdV numberOfEventsV + + rotateEventLog logIdV numberOfEventsV = do + -- XXX: build checkpoint event + history <- getEvents eventSource + let checkpoint = checkpointer history + -- XXX: rotate with checkpoint + currentLogId <- readTVarIO logIdV + let currentLogId' = currentLogId + 1 + rotate currentLogId' checkpoint + -- XXX: clear numberOfEvents + bump logId + atomically $ do + writeTVar numberOfEventsV 0 + writeTVar logIdV currentLogId' + + (eventSource, EventSink{putEvent, rotate}) = eventStore + +prepareRotatedEventStore :: + (IsChainState tx, MonadTime m, MonadSTM m, MonadUnliftIO m) => + RotationConfig -> + ChainStateType tx -> + EventStore (StateEvent tx) m -> + m (EventStore (StateEvent tx) m) +prepareRotatedEventStore rotationConfig initialChainState eventStore = do + now <- getCurrentTime + let checkpointer = mkChechpointer initialChainState now + -- FIXME! + let logId = 0 + mkRotatedEventStore rotationConfig checkpointer logId eventStore + +mkChechpointer :: IsChainState tx => ChainStateType tx -> UTCTime -> Checkpointer (StateEvent tx) +mkChechpointer initialChainState time events = + StateEvent + { eventId = maybe 0 (succ . last) (nonEmpty $ (\StateEvent{eventId} -> eventId) <$> events) + , stateChanged = + Checkpoint . foldl' aggregate initialState $ + (\StateEvent{stateChanged} -> stateChanged) <$> events + , time + } + where + initialState = Idle IdleState{chainState = initialChainState} diff --git a/hydra-node/src/Hydra/HeadLogic.hs b/hydra-node/src/Hydra/HeadLogic.hs index 4115aeb76da..94524d54d2a 100644 --- a/hydra-node/src/Hydra/HeadLogic.hs +++ b/hydra-node/src/Hydra/HeadLogic.hs @@ -33,6 +33,7 @@ import Hydra.Chain ( ChainStateHistory, OnChainTx (..), PostChainTx (..), + initHistory, pushNewState, rollbackHistory, ) @@ -65,6 +66,7 @@ import Hydra.HeadLogic.State ( OpenState (..), PendingCommits, SeenSnapshot (..), + getChainState, seenSnapshotNumber, setChainState, ) @@ -1704,6 +1706,7 @@ aggregate st = \case _otherState -> st IgnoredHeadInitializing{} -> st TxInvalid{} -> st + Checkpoint state' -> state' aggregateState :: IsChainState tx => @@ -1753,3 +1756,4 @@ aggregateChainStateHistory history = \case IgnoredHeadInitializing{} -> history TxInvalid{} -> history LocalStateCleared{} -> history + Checkpoint state' -> initHistory $ getChainState state' diff --git a/hydra-node/src/Hydra/HeadLogic/Outcome.hs b/hydra-node/src/Hydra/HeadLogic/Outcome.hs index ae7e21b6055..a3e781b71cf 100644 --- a/hydra-node/src/Hydra/HeadLogic/Outcome.hs +++ b/hydra-node/src/Hydra/HeadLogic/Outcome.hs @@ -143,6 +143,7 @@ data StateChanged tx } | TxInvalid {headId :: HeadId, utxo :: UTxOType tx, transaction :: tx, validationError :: ValidationError} | LocalStateCleared {headId :: HeadId, snapshotNumber :: SnapshotNumber} + | Checkpoint {state :: HeadState tx} deriving stock (Generic) deriving stock instance (IsChainState tx, IsTx tx, Eq (HeadState tx), Eq (ChainStateType tx)) => Eq (StateChanged tx) diff --git a/hydra-node/src/Hydra/HeadLogic/State.hs b/hydra-node/src/Hydra/HeadLogic/State.hs index 11b4725307f..478eecdf372 100644 --- a/hydra-node/src/Hydra/HeadLogic/State.hs +++ b/hydra-node/src/Hydra/HeadLogic/State.hs @@ -60,6 +60,14 @@ setChainState chainState = \case Open st -> Open st{chainState} Closed st -> Closed st{chainState} +-- | Get the chain state in any 'HeadState'. +getChainState :: HeadState tx -> ChainStateType tx +getChainState = \case + Idle IdleState{chainState} -> chainState + Initial InitialState{chainState} -> chainState + Open OpenState{chainState} -> chainState + Closed ClosedState{chainState} -> chainState + -- | Get the head parameters in any 'HeadState'. getHeadParameters :: HeadState tx -> Maybe HeadParameters getHeadParameters = \case diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index 46e8b942edd..25118951e8c 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -31,6 +31,7 @@ import Hydra.Chain ( ) import Hydra.Chain.ChainState (ChainStateType, IsChainState) import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventsToSinks, stateChanged) +import Hydra.Events.Rotation (EventStore) import Hydra.HeadLogic ( Effect (..), HeadState (..), @@ -168,10 +169,11 @@ hydrate :: Environment -> Ledger tx -> ChainStateType tx -> - EventSource (StateEvent tx) m -> + EventStore (StateEvent tx) m -> [EventSink (StateEvent tx) m] -> m (DraftHydraNode tx m) -hydrate tracer env ledger initialChainState eventSource eventSinks = do +hydrate tracer env ledger initialChainState (eventSource, eventSink) eventSinks = do + let allSinks = eventSink : eventSinks traceWith tracer LoadingState (lastEventId, (headState, chainStateHistory)) <- runConduitRes $ @@ -188,7 +190,7 @@ hydrate tracer env ledger initialChainState eventSource eventSinks = do -- (Re-)submit events to sinks; de-duplication is handled by the sinks traceWith tracer ReplayingState runConduitRes $ - sourceEvents eventSource .| mapM_C (\e -> lift $ putEventsToSinks eventSinks [e]) + sourceEvents eventSource .| mapM_C (\e -> lift $ putEventsToSinks allSinks [e]) nodeState <- createNodeState (getLast lastEventId) headState inputQueue <- createInputQueue @@ -200,12 +202,13 @@ hydrate tracer env ledger initialChainState eventSource eventSinks = do , nodeState , inputQueue , eventSource - , eventSinks + , eventSinks = allSinks , chainStateHistory } where initialState = Idle IdleState{chainState = initialChainState} + -- REVIEW! recoverHeadStateC = mapC stateChanged .| getZipSink diff --git a/hydra-node/src/Hydra/Node/Run.hs b/hydra-node/src/Hydra/Node/Run.hs index 68d91e16eca..83a4bfb0897 100644 --- a/hydra-node/src/Hydra/Node/Run.hs +++ b/hydra-node/src/Hydra/Node/Run.hs @@ -20,7 +20,8 @@ import Hydra.Chain.CardanoClient (QueryPoint (..), queryGenesisParameters) import Hydra.Chain.Direct (loadChainContext, mkTinyWallet, withDirectChain) import Hydra.Chain.Direct.State (initialChainState) import Hydra.Chain.Offline (loadGenesisFile, withOfflineChain) -import Hydra.Events.FileBased (eventPairFromPersistenceIncremental) +import Hydra.Events.FileBased (mkFileBasedEventStore) +import Hydra.Events.Rotation (RotationConfig (..), prepareRotatedEventStore) import Hydra.Ledger.Cardano (cardanoLedger, newLedgerEnv) import Hydra.Logging (traceWith, withTracer) import Hydra.Logging.Messages (HydraLog (..)) @@ -76,17 +77,15 @@ run opts = do pparams <- readJsonFileThrow parseJSON (cardanoLedgerProtocolParametersFile ledgerConfig) globals <- getGlobalsForChain chainConfig withCardanoLedger pparams globals $ \ledger -> do - incPersistence <- createPersistenceIncremental (persistenceDir <> "/state") -- Hydrate with event source and sinks - (eventSource, filePersistenceSink) <- eventPairFromPersistenceIncremental incPersistence + eventStore@(eventSource, _) <- + prepareEventStore + =<< mkFileBasedEventStore (persistenceDir <> "/state") createPersistenceIncremental -- NOTE: Add any custom sink setup code here -- customSink <- createCustomSink - let eventSinks = - [ filePersistenceSink - -- NOTE: Add any custom sinks here - -- , customSink - ] - wetHydraNode <- hydrate (contramap Node tracer) env ledger initialChainState eventSource eventSinks + -- NOTE: Add any customSink here + let eventSinks = [] + wetHydraNode <- hydrate (contramap Node tracer) env ledger initialChainState eventStore eventSinks -- Chain withChain <- prepareChainComponent tracer env chainConfig withChain (chainStateHistory wetHydraNode) (wireChainInput wetHydraNode) $ \chain -> do @@ -128,10 +127,18 @@ run opts = do wallet <- mkTinyWallet (contramap DirectChain tracer) cfg pure $ withDirectChain (contramap DirectChain tracer) cfg ctx wallet + prepareEventStore eventStore = do + case RotateAfter <$> persistenceRotateAfter of + Nothing -> + pure eventStore + Just rotationConfig -> do + prepareRotatedEventStore rotationConfig initialChainState eventStore + RunOptions { verbosity , monitoringPort , persistenceDir + , persistenceRotateAfter , chainConfig , ledgerConfig , listen diff --git a/hydra-node/src/Hydra/Options.hs b/hydra-node/src/Hydra/Options.hs index 2c8bdda6a37..be866d2aefd 100644 --- a/hydra-node/src/Hydra/Options.hs +++ b/hydra-node/src/Hydra/Options.hs @@ -183,6 +183,7 @@ data RunOptions = RunOptions , hydraSigningKey :: FilePath , hydraVerificationKeys :: [FilePath] , persistenceDir :: FilePath + , persistenceRotateAfter :: Maybe Natural , chainConfig :: ChainConfig , ledgerConfig :: LedgerConfig } @@ -209,6 +210,7 @@ instance Arbitrary RunOptions where hydraSigningKey <- genFilePath "sk" hydraVerificationKeys <- reasonablySized (listOf (genFilePath "vk")) persistenceDir <- genDirPath + persistenceRotateAfter <- arbitrary chainConfig <- arbitrary ledgerConfig <- arbitrary pure $ @@ -226,6 +228,7 @@ instance Arbitrary RunOptions where , hydraSigningKey , hydraVerificationKeys , persistenceDir + , persistenceRotateAfter , chainConfig , ledgerConfig } @@ -249,6 +252,7 @@ defaultRunOptions = , hydraSigningKey = "hydra.sk" , hydraVerificationKeys = [] , persistenceDir = "./" + , persistenceRotateAfter = Nothing , chainConfig = Direct defaultDirectChainConfig , ledgerConfig = defaultLedgerConfig } @@ -272,6 +276,7 @@ runOptionsParser = <*> hydraSigningKeyFileParser <*> many hydraVerificationKeyFileParser <*> persistenceDirParser + <*> optional persistenceRotateAfterParser <*> chainConfigParser <*> ledgerConfigParser @@ -776,6 +781,16 @@ persistenceDirParser = \Do not edit these files manually!" ) +persistenceRotateAfterParser :: Parser Natural +persistenceRotateAfterParser = + option + auto + ( long "persistence-rotate-after" + <> metavar "NATURAL" + <> help + "The number of Hydra events to trigger rotation (default: no rotation)" + ) + hydraNodeCommand :: ParserInfo Command hydraNodeCommand = info @@ -901,6 +916,7 @@ toArgs , hydraSigningKey , hydraVerificationKeys , persistenceDir + , persistenceRotateAfter , chainConfig , ledgerConfig } = @@ -917,6 +933,7 @@ toArgs <> concatMap toArgPeer peers <> maybe [] (\mport -> ["--monitoring-port", show mport]) monitoringPort <> ["--persistence-dir", persistenceDir] + <> maybe [] (\rotateAfter -> ["--persistence-rotate-after", show rotateAfter]) persistenceRotateAfter <> argsChainConfig chainConfig <> argsLedgerConfig where diff --git a/hydra-node/test/Hydra/BehaviorSpec.hs b/hydra-node/test/Hydra/BehaviorSpec.hs index 9f56cdbf227..2bb1f64ec83 100644 --- a/hydra-node/test/Hydra/BehaviorSpec.hs +++ b/hydra-node/test/Hydra/BehaviorSpec.hs @@ -1218,6 +1218,7 @@ createHydraNode tracer ledger chainState signingKey otherParties outputs message Just TimedServerOutput{output} -> atomically $ do writeTQueue outputs output modifyTVar' outputHistory (output :) + , rotate = const . const $ pure () } -- NOTE: Not using 'hydrate' as we don't want to run the event source conduit. let headState = Idle IdleState{chainState} diff --git a/hydra-node/test/Hydra/Events/FileBasedSpec.hs b/hydra-node/test/Hydra/Events/FileBasedSpec.hs index 3d54f4d2fc4..c252093ad3f 100644 --- a/hydra-node/test/Hydra/Events/FileBasedSpec.hs +++ b/hydra-node/test/Hydra/Events/FileBasedSpec.hs @@ -10,7 +10,7 @@ import Hydra.Chain.Direct.State () import Conduit (runConduitRes, sinkList, (.|)) import Data.List (zipWith3) import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), getEvents, putEvent) -import Hydra.Events.FileBased (eventPairFromPersistenceIncremental) +import Hydra.Events.FileBased (mkFileBasedEventStore) import Hydra.HeadLogic (StateChanged) import Hydra.Ledger.Cardano (Tx) import Hydra.Ledger.Simple (SimpleTx) @@ -31,7 +31,7 @@ spec = do roundtripAndGoldenSpecsWithSettings (defaultSettings{sampleSize = 1}) (Proxy @(MinimumSized (StateEvent Tx))) roundtripAndGoldenADTSpecsWithSettings (defaultSettings{sampleSize = 1}) (Proxy @(MinimumSized (StateChanged Tx))) - describe "eventPairFromPersistenceIncremental" $ do + describe "mkFileBasedEventStore" $ do prop "can stream events" $ forAllShrink genContinuousEvents shrink $ \events -> ioProperty $ @@ -81,8 +81,7 @@ spec = do forM_ events append -- Load and store events through the event source interface (src, EventSink{putEvent}) <- - eventPairFromPersistenceIncremental - =<< createPersistenceIncremental (tmpDir <> "/data") + mkFileBasedEventStore (tmpDir <> "/data") createPersistenceIncremental loadedEvents <- getEvents src -- Store all loaded events like the node would do forM_ loadedEvents putEvent @@ -96,6 +95,5 @@ genContinuousEvents = withEventSourceAndSink :: (EventSource (StateEvent SimpleTx) IO -> EventSink (StateEvent SimpleTx) IO -> IO b) -> IO b withEventSourceAndSink action = withTempDir "hydra-persistence" $ \tmpDir -> do - persistence <- createPersistenceIncremental (tmpDir <> "/data") - (eventSource, eventSink) <- eventPairFromPersistenceIncremental persistence + (eventSource, eventSink) <- mkFileBasedEventStore (tmpDir <> "/data") createPersistenceIncremental action eventSource eventSink diff --git a/hydra-node/test/Hydra/Events/RotationSpec.hs b/hydra-node/test/Hydra/Events/RotationSpec.hs new file mode 100644 index 00000000000..6b2b035a67e --- /dev/null +++ b/hydra-node/test/Hydra/Events/RotationSpec.hs @@ -0,0 +1,107 @@ +module Hydra.Events.RotationSpec where + +import Hydra.Prelude +import Test.Hydra.Prelude + +import Data.List qualified as List +import Hydra.Chain.ChainState (ChainSlot (..)) +import Hydra.Events (EventSink (..), HasEventId (..), getEvents) +import Hydra.Events.Rotation +import Hydra.Ledger.Simple (SimpleChainState (..), simpleLedger) +import Hydra.Logging (showLogsOnFailure) +import Hydra.Node (hydrate) +import Hydra.NodeSpec (createMockSourceSink, inputsToOpenHead, notConnect, primeWith, runToCompletion) +import Test.Hydra.Tx.Fixture (testEnvironment) +import Test.QuickCheck (Positive (..), (==>)) +import Test.QuickCheck.Instances.Natural () + +spec :: Spec +spec = parallel $ do + describe "Node" $ do + -- Set up a hydrate function with fixtures curried + let setupHydrate action = + showLogsOnFailure "NodeSpec" $ \tracer -> do + let testHydrate = hydrate tracer testEnvironment simpleLedger SimpleChainState{slot = ChainSlot 0} + action testHydrate + around setupHydrate $ do + it "rotates while running" $ \testHydrate -> do + failAfter 1 $ do + now <- getCurrentTime + let initialChainState = SimpleChainState{slot = ChainSlot 0} + let checkpointer = mkChechpointer initialChainState now + let logId = 0 + let rotationConfig = RotateAfter 4 + eventStore <- createMockSourceSink + rotatingEventStore <- mkRotatedEventStore rotationConfig checkpointer logId eventStore + testHydrate rotatingEventStore [] + >>= notConnect + >>= primeWith inputsToOpenHead + >>= runToCompletion + rotatedHistory <- getEvents (fst rotatingEventStore) + length rotatedHistory `shouldBe` 2 + it "consistent state after restarting with rotation" $ \_testHydrate -> do + pendingWith "TODO" + prop "a rotated an non rotated node have consistent state" $ pendingWith "TODO" + + describe "Rotation algorithm" $ do + prop "rotates on startup" $ + \(Positive x, Positive y) -> + (y > x) ==> do + eventStore@(eventSource, eventSink) <- createMockSourceSink + let totalEvents = toInteger y + let events = TrivialEvent <$> [1 .. fromInteger totalEvents] + mapM_ (putEvent eventSink) events + unrotatedHistory <- getEvents eventSource + toInteger (length unrotatedHistory) `shouldBe` totalEvents + let logId = 0 + let rotationConfig = RotateAfter x + (rotatedEventSource, _) <- mkRotatedEventStore rotationConfig trivialCheckpoint logId eventStore + rotatedHistory <- getEvents rotatedEventSource + length rotatedHistory `shouldBe` 1 + + -- given some event store (source + sink) + -- lets configure a rotated event store that rotates after x events + -- forall y > 0: put x*y events + -- load all events returns a suffix of put events with length <= x + prop "rotates after configured number of events" $ + \(Positive x, Positive y) -> do + let rotationConfig = RotateAfter x + mockEventStore <- createMockSourceSink + let logId = 0 + rotatingEventStore <- mkRotatedEventStore rotationConfig trivialCheckpoint logId mockEventStore + let (eventSource, EventSink{putEvent}) = rotatingEventStore + let totalEvents = toInteger x * y + let events = TrivialEvent . fromInteger <$> [1 .. totalEvents] + forM_ events putEvent + currentHistory <- getEvents eventSource + let rotatedElements = fromInteger totalEvents + let expectRotated = take rotatedElements events + let expectRemaining = drop rotatedElements events + let expectedCurrentHistory = trivialCheckpoint expectRotated : expectRemaining + expectedCurrentHistory `shouldBe` currentHistory + + -- forall y. y > 0 && y < x: put x+y events (= ensures rotation) + -- load one event === checkpoint of first x of events + prop "puts checkpoint event as first event" $ + \(Positive x, Positive y) -> + (y < x) ==> do + let rotationConfig = RotateAfter x + mockEventStore <- createMockSourceSink + let logId = 0 + rotatingEventStore <- mkRotatedEventStore rotationConfig trivialCheckpoint logId mockEventStore + let (eventSource, EventSink{putEvent}) = rotatingEventStore + let totalEvents = toInteger x + toInteger y + let events = TrivialEvent . fromInteger <$> [1 .. totalEvents] + forM_ events putEvent + currentHistory <- getEvents eventSource + let expectRotated = take (fromInteger $ toInteger x) events + trivialCheckpoint expectRotated `shouldBe` List.head currentHistory + +newtype TrivialEvent = TrivialEvent Word64 + deriving newtype (Num, Show, Eq) + +instance HasEventId TrivialEvent where + getEventId (TrivialEvent w) = w + +trivialCheckpoint :: [TrivialEvent] -> TrivialEvent +trivialCheckpoint = sum diff --git a/hydra-node/test/Hydra/NodeSpec.hs b/hydra-node/test/Hydra/NodeSpec.hs index e17829b45a4..7bb00b1ac97 100644 --- a/hydra-node/test/Hydra/NodeSpec.hs +++ b/hydra-node/test/Hydra/NodeSpec.hs @@ -6,7 +6,7 @@ import Hydra.Prelude hiding (label) import Test.Hydra.Prelude import Conduit (MonadUnliftIO, yieldMany) -import Control.Concurrent.Class.MonadSTM (MonadLabelledSTM, labelTVarIO, modifyTVar, newTVarIO, readTVarIO) +import Control.Concurrent.Class.MonadSTM (MonadLabelledSTM, labelTVarIO, modifyTVar, newTVarIO, readTVarIO, writeTVar) import Hydra.API.ClientInput (ClientInput (..)) import Hydra.API.Server (Server (..), mkTimedServerOutputFromStateEvent) import Hydra.API.ServerOutput (ClientMessage (..), ServerOutput (..), TimedServerOutput (..)) @@ -14,6 +14,7 @@ import Hydra.Cardano.Api (SigningKey) import Hydra.Chain (Chain (..), ChainEvent (..), OnChainTx (..), PostTxError (NoSeedInput)) import Hydra.Chain.ChainState (ChainSlot (ChainSlot), IsChainState) import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), genStateEvent, getEventId) +import Hydra.Events.Rotation (EventStore) import Hydra.HeadLogic (Input (..)) import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged) import Hydra.HeadLogicSpec (inInitialState, receiveMessage, receiveMessageFrom, testSnapshot) @@ -73,8 +74,7 @@ spec = parallel $ do \someEvents -> do (mockSink1, getMockSinkEvents1) <- createRecordingSink (mockSink2, getMockSinkEvents2) <- createRecordingSink - - void $ testHydrate (mockSource someEvents) [mockSink1, mockSink2] + void $ testHydrate (mockEventStore someEvents) [mockSink1, mockSink2] getMockSinkEvents1 `shouldReturn` someEvents getMockSinkEvents2 `shouldReturn` someEvents @@ -84,7 +84,7 @@ spec = parallel $ do \someEvents -> do (sink, getSinkEvents) <- createRecordingSink - void $ testHydrate (mockSource someEvents) [sink] + void $ testHydrate (mockEventStore someEvents) [sink] seenEvents <- getSinkEvents getEventId <$> seenEvents `shouldBe` getEventId <$> someEvents @@ -93,9 +93,13 @@ spec = parallel $ do forAllShrink (listOf1 $ genStateChanged testEnvironment >>= genStateEvent) shrink $ \someEvents -> do let genSinks = elements [mockSink, failingSink] - failingSink = EventSink{putEvent = \_ -> failure "failing sink called"} + failingSink = + EventSink + { putEvent = \_ -> failure "failing putEvent sink called" + , rotate = \_ _ -> failure "failing rotate sink called" + } forAllBlind (listOf genSinks) $ \sinks -> - testHydrate (mockSource someEvents) (sinks <> [failingSink]) + testHydrate (mockEventStore someEvents) (sinks <> [failingSink]) `shouldThrow` \(_ :: HUnitFailure) -> True it "checks head state" $ \testHydrate -> @@ -110,7 +114,7 @@ spec = parallel $ do <*> (HeadInitialized (mkHeadParameters env) <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary) <*> pure now forAllShrink genEvent shrink $ \incompatibleEvent -> - testHydrate (mockSource [incompatibleEvent]) [] + testHydrate (mockEventStore [incompatibleEvent]) [] `shouldThrow` \(_ :: ParameterMismatch) -> True describe "stepHydraNode" $ do @@ -119,7 +123,7 @@ spec = parallel $ do (mockSink1, getMockSinkEvents1) <- createRecordingSink (mockSink2, getMockSinkEvents2) <- createRecordingSink - testHydrate (mockSource []) [mockSink1, mockSink2] + testHydrate (mockEventStore []) [mockSink1, mockSink2] >>= notConnect >>= primeWith inputsToOpenHead >>= runToCompletion @@ -139,7 +143,7 @@ spec = parallel $ do forAllShrinkBlind genInputs shrink $ \someInputs -> idempotentIOProperty $ do (sink, getSinkEvents) <- createRecordingSink - testHydrate (mockSource []) [sink] + testHydrate (mockEventStore []) [sink] >>= notConnect >>= primeWith someInputs >>= runToCompletion @@ -156,9 +160,9 @@ spec = parallel $ do it "can continue after re-hydration" $ \testHydrate -> failAfter 1 $ do - (eventSource, eventSink) <- createMockSourceSink + eventStore <- createMockSourceSink - testHydrate eventSource [eventSink] + testHydrate eventStore [] >>= notConnect >>= primeWith inputsToOpenHead >>= runToCompletion @@ -169,7 +173,7 @@ spec = parallel $ do (recordingSink, getRecordedEvents) <- createRecordingSink (node, getServerOutputs) <- - testHydrate eventSource [eventSink, recordingSink] + testHydrate eventStore [recordingSink] >>= notConnect >>= primeWith [reqTx] >>= recordServerOutputs @@ -315,13 +319,13 @@ spec = parallel $ do -- | Add given list of inputs to the 'InputQueue'. This is returning the node to -- allow for chaining with 'runToCompletion'. -primeWith :: Monad m => [Input SimpleTx] -> HydraNode SimpleTx m -> m (HydraNode SimpleTx m) +primeWith :: Monad m => [Input tx] -> HydraNode tx m -> m (HydraNode tx m) primeWith inputs node@HydraNode{inputQueue = InputQueue{enqueue}} = do forM_ inputs enqueue pure node -- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations. -notConnect :: MonadThrow m => DraftHydraNode SimpleTx m -> m (HydraNode SimpleTx m) +notConnect :: MonadThrow m => DraftHydraNode tx m -> m (HydraNode tx m) notConnect = connect mockChain mockNetwork mockServer @@ -331,11 +335,11 @@ mockServer = { sendMessage = \_ -> pure () } -mockNetwork :: Monad m => Network m (Message SimpleTx) +mockNetwork :: Monad m => Network m (Message tx) mockNetwork = Network{broadcast = \_ -> pure ()} -mockChain :: MonadThrow m => Chain SimpleTx m +mockChain :: MonadThrow m => Chain tx m mockChain = Chain { postTx = \_ -> pure () @@ -345,7 +349,14 @@ mockChain = } mockSink :: Monad m => EventSink a m -mockSink = EventSink{putEvent = const $ pure ()} +mockSink = EventSink{putEvent = const $ pure (), rotate = const . const $ pure ()} + +mockEventStore :: Monad m => [a] -> EventStore a m +mockEventStore events = + (eventSource, eventSink) + where + eventSource = mockSource events + eventSink = mockSink mockSource :: Monad m => [a] -> EventSource a m mockSource events = @@ -356,7 +367,9 @@ mockSource events = createRecordingSink :: IO (EventSink a IO, IO [a]) createRecordingSink = do (putEvent, getAll) <- messageRecorder - pure (EventSink{putEvent}, getAll) + pure (EventSink{putEvent, rotate}, getAll) + where + rotate = const . const $ pure () createMockSourceSink :: MonadLabelledSTM m => m (EventSource a m, EventSink a m) createMockSourceSink = do @@ -372,6 +385,8 @@ createMockSourceSink = do EventSink { putEvent = \x -> atomically $ modifyTVar tvar (<> [x]) + , rotate = \_ checkpoint -> + atomically $ writeTVar tvar [checkpoint] } pure (source, sink) @@ -420,7 +435,8 @@ testHydraNode :: [Input SimpleTx] -> m (HydraNode SimpleTx m) testHydraNode tracer signingKey otherParties contestationPeriod depositDeadline inputs = do - hydrate tracer env simpleLedger SimpleChainState{slot = ChainSlot 0} (mockSource []) [] + let eventStore = mockEventStore [] + hydrate tracer env simpleLedger SimpleChainState{slot = ChainSlot 0} eventStore [] >>= notConnect >>= primeWith inputs where @@ -454,6 +470,7 @@ recordServerOutputs node = do case mkTimedServerOutputFromStateEvent event of Nothing -> pure () Just TimedServerOutput{output} -> record $ Left output + , rotate = const . const $ pure () } pure ( node{eventSinks = apiSink : eventSinks node, server = Server{sendMessage = record . Right}}