@@ -86,15 +86,13 @@ import Cardano.Binary (serialize')
86
86
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation ))
87
87
import Control.Concurrent.Class.MonadSTM (
88
88
MonadSTM (readTQueue , writeTQueue ),
89
- modifyTVar' ,
90
89
newTQueueIO ,
91
90
newTVarIO ,
92
91
readTVarIO ,
93
92
writeTVar ,
94
93
)
95
94
import Control.Tracer (Tracer )
96
95
import Data.IntMap qualified as IMap
97
- import Data.Sequence.Strict ((|>) )
98
96
import Data.Sequence.Strict qualified as Seq
99
97
import Data.Vector (
100
98
Vector ,
@@ -225,9 +223,8 @@ withReliability ::
225
223
-- | Underlying network component providing consuming and sending channels.
226
224
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound ))) (ReliableMsg (Heartbeat outbound )) a ->
227
225
NetworkComponent m (Authenticated (Heartbeat inbound )) (Heartbeat outbound ) a
228
- withReliability tracer MessagePersistence {saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
226
+ withReliability tracer MessagePersistence {saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do
229
227
acksCache <- loadAcks >>= newTVarIO
230
- -- FIXME: always growing
231
228
sentMessages <- loadMessages >>= newTVarIO . Seq. fromList
232
229
resendQ <- newTQueueIO
233
230
let ourIndex = fromMaybe (error " This cannot happen because we constructed the list with our party inside." ) (findPartyIndex me)
@@ -237,15 +234,19 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
237
234
reliableBroadcast sentMessages ourIndex acksCache network
238
235
where
239
236
allParties = fromList $ sort $ me : otherParties
240
- reliableBroadcast sentMessages ourIndex acksCache Network {broadcast} =
237
+
238
+ reliableBroadcast _sentMessages ourIndex acksCache Network {broadcast} =
241
239
action $
242
240
Network
243
241
{ broadcast = \ msg ->
244
242
case msg of
245
243
Data {} -> do
246
- localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
247
- saveAcks localCounter
248
- appendMessage msg
244
+ -- FIXME: No outbound message cache and persistence, resending will be broken
245
+ localCounter <- atomically $ do
246
+ -- cacheMessage msg
247
+ incrementAckCounter
248
+ -- saveAcks localCounter
249
+ -- appendMessage msg
249
250
traceWith tracer BroadcastCounter {ourIndex, localCounter}
250
251
broadcast $ ReliableMsg localCounter msg
251
252
Ping {} -> do
@@ -261,8 +262,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
261
262
writeTVar acksCache newAcks
262
263
pure newAcks
263
264
264
- cacheMessage msg =
265
- modifyTVar' sentMessages (|> msg)
265
+ -- cacheMessage msg =
266
+ -- modifyTVar' sentMessages (|> msg)
266
267
267
268
reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do
268
269
if length acknowledged /= length allParties
0 commit comments