@@ -9,7 +9,12 @@ import Blockfrost.Client (
9
9
runBlockfrost ,
10
10
)
11
11
import Blockfrost.Client qualified as Blockfrost
12
- import Control.Retry (RetryPolicyM , RetryStatus , exponentialBackoff , limitRetries , retrying )
12
+ import Control.Concurrent.Class.MonadSTM (
13
+ MonadSTM (readTVarIO ),
14
+ newTVarIO ,
15
+ writeTVar ,
16
+ )
17
+ import Control.Retry (RetryPolicyM , RetryStatus (.. ), exponentialBackoff , limitRetries , retrying )
13
18
import Hydra.Cardano.Api (
14
19
BlockHeader ,
15
20
ChainPoint (.. ),
@@ -50,7 +55,7 @@ runBlockfrostM ::
50
55
BlockfrostClientT IO a ->
51
56
ExceptT APIBlockfrostError IO a
52
57
runBlockfrostM prj action = do
53
- result <- liftIO $ runBlockfrost prj action
58
+ result <- lift $ runBlockfrost prj action
54
59
case result of
55
60
Left err -> throwError (BlockfrostError $ show err)
56
61
Right val -> pure val
@@ -93,26 +98,43 @@ blockfrostClient tracer projectPath startFromBlockHash = do
93
98
94
99
let blockTime = realToFrac _genesisSlotLength / realToFrac _genesisActiveSlotsCoefficient
95
100
101
+ stateTVar <- newTVarIO (block, mempty )
96
102
void $
97
- retrying retryPolicy shouldRetry $ \ _ ->
103
+ retrying retryPolicy shouldRetry $ \ RetryStatus {rsIterNumber} -> do
104
+ -- XXX: wait on any iteration number, except 0 as it's the first try.
105
+ when (rsIterNumber > 0 ) $ threadDelay blockTime
98
106
either (error . show ) id
99
- <$> runExceptT
100
- ( do
101
- threadDelay blockTime
102
- loop tracer prj block networkId blockTime observerHandler mempty
103
- )
107
+ <$> runExceptT (loop tracer prj networkId blockTime observerHandler stateTVar)
104
108
}
105
109
110
+ -- | Iterative process that follows the chain using a naive roll-forward approach,
111
+ -- keeping track of the latest known current block and UTxO view.
112
+ -- This process operates at full speed without waiting between calls,
113
+ -- favoring the catch-up process.
106
114
loop ::
107
115
Tracer IO ChainObserverLog ->
108
116
Blockfrost. Project ->
109
- Blockfrost. Block ->
110
117
NetworkId ->
111
118
DiffTime ->
112
119
ObserverHandler IO ->
113
- UTxO ->
120
+ TVar IO ( Blockfrost. Block , UTxO ) ->
114
121
ExceptT APIBlockfrostError IO a
115
- loop tracer prj block networkId blockTime observerHandler utxo = do
122
+ loop tracer prj networkId blockTime observerHandler stateTVar = do
123
+ current <- lift $ readTVarIO stateTVar
124
+ next <- rollForward tracer prj networkId observerHandler current
125
+ atomically $ writeTVar stateTVar next
126
+ loop tracer prj networkId blockTime observerHandler stateTVar
127
+
128
+ -- | From the current block and UTxO view, we collect Hydra observations
129
+ -- and yield the next block and adjusted UTxO view.
130
+ rollForward ::
131
+ Tracer IO ChainObserverLog ->
132
+ Blockfrost. Project ->
133
+ NetworkId ->
134
+ ObserverHandler IO ->
135
+ (Blockfrost. Block , UTxO ) ->
136
+ ExceptT APIBlockfrostError IO (Blockfrost. Block , UTxO )
137
+ rollForward tracer prj networkId observerHandler (block, utxo) = do
116
138
let Blockfrost. Block
117
139
{ _blockHash
118
140
, _blockConfirmations
@@ -151,11 +173,11 @@ loop tracer prj block networkId blockTime observerHandler utxo = do
151
173
then [Tick point blockNo]
152
174
else observationsAt
153
175
154
- -- [7] Loop next .
176
+ -- [7] Next .
155
177
case _blockNextBlock of
156
178
Just nextBlockHash -> do
157
179
block' <- runBlockfrostM prj (Blockfrost. getBlock $ Right nextBlockHash)
158
- loop tracer prj block' networkId blockTime observerHandler adjustedUTxO
180
+ pure ( block', adjustedUTxO)
159
181
Nothing ->
160
182
throwError (MissingNextBlockHash _blockHash)
161
183
0 commit comments