@@ -9,40 +9,55 @@ module Hydra.Chain.Direct (
9
9
10
10
import Hydra.Prelude
11
11
12
+ import Blockfrost.Client qualified as Blockfrost
12
13
import Cardano.Ledger.Shelley.API qualified as Ledger
13
- import Cardano.Ledger.Slot (EpochInfo )
14
+ import Cardano.Ledger.Slot (EpochInfo , SlotNo )
14
15
import Cardano.Slotting.EpochInfo (hoistEpochInfo )
15
16
import Control.Concurrent.Class.MonadSTM (
16
17
newEmptyTMVar ,
17
18
newTQueueIO ,
19
+ newTVarIO ,
18
20
putTMVar ,
19
21
readTQueue ,
22
+ readTVarIO ,
20
23
takeTMVar ,
21
24
writeTQueue ,
25
+ writeTVar ,
22
26
)
23
27
import Control.Exception (IOException )
24
28
import Control.Monad.Trans.Except (runExcept )
29
+ import Control.Retry (constantDelay , retrying )
30
+ import Data.ByteString.Base16 qualified as Base16
31
+ import Data.Text qualified as T
25
32
import Hydra.Cardano.Api (
33
+ BlockHeader (.. ),
26
34
BlockInMode (.. ),
27
35
CardanoEra (.. ),
28
- ChainPoint ,
36
+ ChainPoint ( .. ) ,
29
37
ChainTip ,
30
38
ConsensusModeParams (.. ),
31
39
EpochSlots (.. ),
32
40
EraHistory (EraHistory ),
41
+ Hash ,
33
42
IsShelleyBasedEra (.. ),
34
43
LocalChainSyncClient (.. ),
35
44
LocalNodeClientProtocols (.. ),
36
45
LocalNodeConnectInfo (.. ),
46
+ NetworkId (.. ),
47
+ SerialiseAsCBOR (.. ),
48
+ SlotNo (.. ),
37
49
Tx ,
38
50
TxInMode (.. ),
39
51
TxValidationErrorInCardanoMode ,
52
+ UTxO ,
40
53
chainTipToChainPoint ,
41
54
connectToLocalNode ,
42
55
getBlockHeader ,
43
56
getBlockTxs ,
44
57
getTxBody ,
45
58
getTxId ,
59
+ proxyToAsType ,
60
+ serialiseToRawBytes ,
46
61
toLedgerUTxO ,
47
62
)
48
63
import Hydra.Chain (
@@ -51,6 +66,7 @@ import Hydra.Chain (
51
66
PostTxError (FailedToPostTx , failureReason ),
52
67
currentState ,
53
68
)
69
+ import Hydra.Chain.Blockfrost.Client qualified as Blockfrost
54
70
import Hydra.Chain.CardanoClient (
55
71
QueryPoint (.. ),
56
72
)
@@ -182,7 +198,9 @@ withDirectChain tracer config ctx wallet chainStateHistory callback action = do
182
198
connectToLocalNode
183
199
(connectInfo networkId nodeSocket)
184
200
(clientProtocols chainPoint queue handler)
185
- BlockfrostBackend {} -> undefined
201
+ BlockfrostBackend {projectPath} -> do
202
+ prj <- Blockfrost. projectFromFile projectPath
203
+ blockfrostChainFollow prj chainPoint handler wallet
186
204
)
187
205
(action chainHandle)
188
206
case res of
@@ -191,6 +209,22 @@ withDirectChain tracer config ctx wallet chainStateHistory callback action = do
191
209
where
192
210
CardanoChainConfig {chainBackend, startChainFrom} = config
193
211
212
+ blockfrostChainFollow prj chainPoint handler wallet = do
213
+ Blockfrost. Genesis {_genesisSlotLength, _genesisActiveSlotsCoefficient} <- Blockfrost. runBlockfrostM prj Blockfrost. getLedgerGenesis
214
+
215
+ Blockfrost. Block {_blockHash = (Blockfrost. BlockHash genesisBlockHash)} <-
216
+ Blockfrost. runBlockfrostM prj (Blockfrost. getBlock (Left 0 ))
217
+
218
+ let blockTime = realToFrac _genesisSlotLength / realToFrac _genesisActiveSlotsCoefficient
219
+
220
+ let blockHash = fromChainPoint chainPoint genesisBlockHash
221
+
222
+ void $
223
+ retrying (retryPolicy blockTime) shouldRetry $ \ _ -> do
224
+ loop tracer prj blockTime handler wallet 1 blockHash
225
+ `catch` \ (ex :: APIBlockfrostError ) ->
226
+ pure $ Left ex
227
+
194
228
connectInfo networkId nodeSocket =
195
229
LocalNodeConnectInfo
196
230
{ -- REVIEW: This was 432000 before, but all usages in the
@@ -224,6 +258,117 @@ withDirectChain tracer config ctx wallet chainStateHistory callback action = do
224
258
{ ioException
225
259
}
226
260
261
+ shouldRetry _ = \ case
262
+ Right {} -> pure False
263
+ Left err -> pure $ isRetryable err
264
+
265
+ retryPolicy blockTime = constantDelay (truncate blockTime * 1000 * 1000 )
266
+
267
+ loop ::
268
+ (MonadIO m , MonadThrow m , MonadSTM m ) =>
269
+ Tracer IO DirectChainLog ->
270
+ Blockfrost. Project ->
271
+ DiffTime ->
272
+ ChainSyncHandler m ->
273
+ TinyWallet m ->
274
+ Integer ->
275
+ Blockfrost. BlockHash ->
276
+ m a
277
+ loop tracer prj blockTime handler wallet blockConfirmations current = do
278
+ next <- rollForward tracer prj handler wallet blockConfirmations current
279
+ loop tracer prj blockTime handler wallet blockConfirmations next
280
+
281
+ rollForward ::
282
+ (MonadIO m , MonadThrow m ) =>
283
+ Tracer IO DirectChainLog ->
284
+ Blockfrost. Project ->
285
+ ChainSyncHandler m ->
286
+ TinyWallet m ->
287
+ Integer ->
288
+ Blockfrost. BlockHash ->
289
+ m Blockfrost. BlockHash
290
+ rollForward tracer prj handler wallet blockConfirmations blockHash = do
291
+ Blockfrost. Block
292
+ { _blockHash
293
+ , _blockConfirmations
294
+ , _blockNextBlock
295
+ , _blockHeight
296
+ , _blockSlot
297
+ } <-
298
+ Blockfrost. runBlockfrostM prj $ Blockfrost. getBlock (Right blockHash)
299
+
300
+ -- Check if block within the safe zone to be processes
301
+ when (_blockConfirmations < blockConfirmations) $
302
+ throwIO (NotEnoughBlockConfirmations _blockHash)
303
+
304
+ -- Search block transactions
305
+ txHashesCBOR <- Blockfrost. runBlockfrostM prj . Blockfrost. allPages $ \ p ->
306
+ Blockfrost. getBlockTxsCBOR' (Right _blockHash) p Blockfrost. def
307
+
308
+ -- Convert to cardano-api Tx
309
+ receivedTxs <- mapM (toTx . (\ (Blockfrost. TxHashCBOR (_txHash, cbor)) -> cbor)) txHashesCBOR
310
+
311
+ -- Check if block contains a reference to its next
312
+ nextBlockHash <- maybe (throwIO $ MissingNextBlockHash _blockHash) pure _blockNextBlock
313
+
314
+ blockNo <- maybe (throwIO $ MissingBlockNo _blockHash) (pure . fromInteger ) _blockHeight
315
+ let Blockfrost. BlockHash blockHash' = _blockHash
316
+ let blockHash'' = fromString $ T. unpack blockHash'
317
+ blockSlot <- maybe (throwIO $ MissingBlockSlot _blockSlot) (pure . fromInteger . Blockfrost. unSlot) _blockSlot
318
+ let header = BlockHeader (SlotNo blockSlot) blockHash'' blockNo
319
+ update wallet header receivedTxs
320
+ -- Observe Hydra transactions
321
+ onRollForward handler header receivedTxs
322
+
323
+ pure nextBlockHash
324
+
325
+ -- * Helpers
326
+
327
+ data APIBlockfrostError
328
+ = BlockfrostError Text
329
+ | DecodeError Text
330
+ | NotEnoughBlockConfirmations Blockfrost. BlockHash
331
+ | MissingBlockNo Blockfrost. BlockHash
332
+ | MissingBlockSlot (Maybe Blockfrost. Slot )
333
+ | MissingNextBlockHash Blockfrost. BlockHash
334
+ deriving (Show , Exception )
335
+
336
+ isRetryable :: APIBlockfrostError -> Bool
337
+ isRetryable (BlockfrostError _) = True
338
+ isRetryable (DecodeError _) = False
339
+ isRetryable (NotEnoughBlockConfirmations _) = True
340
+ isRetryable (MissingBlockNo _) = True
341
+ isRetryable (MissingBlockSlot _) = True
342
+ isRetryable (MissingNextBlockHash _) = True
343
+ toChainPoint :: Blockfrost. Block -> ChainPoint
344
+ toChainPoint Blockfrost. Block {_blockSlot, _blockHash} =
345
+ ChainPoint slotNo headerHash
346
+ where
347
+ slotNo :: SlotNo
348
+ slotNo = maybe 0 (fromInteger . Blockfrost. unSlot) _blockSlot
349
+
350
+ headerHash :: Hash BlockHeader
351
+ headerHash = fromString . toString $ Blockfrost. unBlockHash _blockHash
352
+
353
+ fromNetworkMagic :: Integer -> NetworkId
354
+ fromNetworkMagic = \ case
355
+ 0 -> Mainnet
356
+ magicNbr -> Testnet (NetworkMagic (fromInteger magicNbr))
357
+
358
+ toTx :: MonadThrow m => Blockfrost. TransactionCBOR -> m Tx
359
+ toTx (Blockfrost. TransactionCBOR txCbor) =
360
+ case decodeBase16 txCbor of
361
+ Left decodeErr -> throwIO . DecodeError $ " Bad Base16 Tx CBOR: " <> decodeErr
362
+ Right bytes ->
363
+ case deserialiseFromCBOR (proxyToAsType (Proxy @ Tx )) bytes of
364
+ Left deserializeErr -> throwIO . DecodeError $ " Bad Tx CBOR: " <> show deserializeErr
365
+ Right tx -> pure tx
366
+
367
+ fromChainPoint :: ChainPoint -> Text -> Blockfrost. BlockHash
368
+ fromChainPoint chainPoint genesisBlockHash = case chainPoint of
369
+ ChainPoint _ headerHash -> Blockfrost. BlockHash (decodeUtf8 . Base16. encode . serialiseToRawBytes $ headerHash)
370
+ ChainPointAtGenesis -> Blockfrost. BlockHash genesisBlockHash
371
+
227
372
newtype ConnectException = ConnectException
228
373
{ ioException :: IOException
229
374
}
0 commit comments