@@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq)
7
7
import Cardano.Ledger.Core (PParams )
8
8
import Control.Concurrent.MVar (newEmptyMVar , putMVar , takeMVar )
9
9
import Control.Concurrent.STM.TChan (newBroadcastTChanIO , writeTChan )
10
- import Control.Concurrent.STM.TVar (modifyTVar' , newTVarIO )
10
+ import Control.Concurrent.STM.TVar (modifyTVar' , newTVar , newTVarIO , readTVar )
11
11
import Control.Exception (IOException )
12
12
import Hydra.API.APIServerLog (APIServerLog (.. ))
13
13
import Hydra.API.ClientInput (ClientInput )
@@ -79,17 +79,19 @@ withAPIServer ::
79
79
withAPIServer config party persistence tracer chain pparams callback action =
80
80
handle onIOException $ do
81
81
responseChannel <- newBroadcastTChanIO
82
+ -- Intialize our read models from stored events
83
+ -- NOTE: we do not keep the stored events around in memory
82
84
timedOutputEvents <- loadAll
83
-
84
- -- Intialize our read model from stored events
85
85
headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus
86
86
snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo
87
87
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId
88
88
89
- -- NOTE: we need to reverse the list because we store history in a reversed
90
- -- list in memory but in order on disk
91
- -- FIXME: always growing
92
- history <- newTVarIO (reverse timedOutputEvents)
89
+ nextSeqVar <- newTVarIO 0
90
+ let nextSeq = atomically $ do
91
+ seq <- readTVar nextSeqVar
92
+ modifyTVar' nextSeqVar (+ 1 )
93
+ pure seq
94
+
93
95
(notifyServerRunning, waitForServerRunning) <- setupServerNotification
94
96
95
97
let serverSettings =
@@ -106,15 +108,15 @@ withAPIServer config party persistence tracer chain pparams callback action =
106
108
. simpleCors
107
109
$ websocketsOr
108
110
defaultConnectionOptions
109
- (wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel)
111
+ (wsApp party tracer nextSeq callback headStatusP snapshotUtxoP responseChannel)
110
112
(httpApp tracer chain pparams (atomically $ getLatest headIdP) (atomically $ getLatest snapshotUtxoP) callback)
111
113
)
112
114
( do
113
115
waitForServerRunning
114
116
action $
115
117
Server
116
118
{ sendOutput = \ output -> do
117
- timedOutput <- appendToHistory history output
119
+ timedOutput <- persistOutput nextSeq output
118
120
atomically $ do
119
121
update headStatusP output
120
122
update snapshotUtxoP output
@@ -125,7 +127,7 @@ withAPIServer config party persistence tracer chain pparams callback action =
125
127
where
126
128
APIServerConfig {host, port, tlsCertPath, tlsKeyPath} = config
127
129
128
- PersistenceIncremental {loadAll, append } = persistence
130
+ PersistenceIncremental {loadAll} = persistence
129
131
130
132
startServer settings app =
131
133
case (tlsCertPath, tlsKeyPath) of
@@ -139,13 +141,11 @@ withAPIServer config party persistence tracer chain pparams callback action =
139
141
_ ->
140
142
runSettings settings app
141
143
142
- appendToHistory history output = do
144
+ persistOutput nextSeq output = do
143
145
time <- getCurrentTime
144
146
timedOutput <- atomically $ do
145
- seq <- nextSequenceNumber history
146
- let timedOutput = TimedServerOutput {output, time, seq }
147
- modifyTVar' history (timedOutput : )
148
- pure timedOutput
147
+ seq <- nextSeq
148
+ pure TimedServerOutput {output, time, seq }
149
149
append timedOutput
150
150
pure timedOutput
151
151
0 commit comments