Skip to content

Commit 051074a

Browse files
committed
Expose inlining info so users can specialize
1 parent b40cbc9 commit 051074a

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

consumers/src/Database/PostgreSQL/Consumers/Components.hs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ runConsumer
5050
-- ^ The consumer.
5151
-> ConnectionSourceM m
5252
-> m (m ())
53+
{-# INLINEABLE runConsumer #-}
5354
runConsumer cc cs = runConsumerWithMaybeIdleSignal cc cs Nothing
5455

5556
runConsumerWithIdleSignal
@@ -67,6 +68,7 @@ runConsumerWithIdleSignal
6768
-> ConnectionSourceM m
6869
-> TMVar Bool
6970
-> m (m ())
71+
{-# INLINEABLE runConsumerWithIdleSignal #-}
7072
runConsumerWithIdleSignal cc cs idleSignal = runConsumerWithMaybeIdleSignal cc cs (Just idleSignal)
7173

7274
-- | Run the consumer and also signal whenever the consumer is waiting for
@@ -85,6 +87,7 @@ runConsumerWithMaybeIdleSignal
8587
-> ConnectionSourceM m
8688
-> Maybe (TMVar Bool)
8789
-> m (m ())
90+
{-# INLINEABLE runConsumerWithMaybeIdleSignal #-}
8891
runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal
8992
| ccMaxRunningJobs cc < 1 = do
9093
logInfo_ "ccMaxRunningJobs < 1, not starting the consumer"
@@ -186,6 +189,7 @@ spawnListener
186189
-> ConnectionSourceM m
187190
-> TriggerNotification m
188191
-> m ThreadId
192+
{-# INLINEABLE spawnListener #-}
189193
spawnListener cc cs outbox =
190194
forkP "listener" $
191195
case ccNotificationChannel cc of
@@ -227,6 +231,7 @@ spawnMonitor
227231
-> ConnectionSourceM m
228232
-> ConsumerID
229233
-> m ThreadId
234+
{-# INLINEABLE spawnMonitor #-}
230235
spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do
231236
runDBT cs ts $ do
232237
now <- currentTime
@@ -313,9 +318,13 @@ spawnDispatcher
313318
-> TVar Int
314319
-> Maybe (TMVar Bool)
315320
-> m ThreadId
321+
{-# INLINEABLE spawnDispatcher #-}
316322
spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mIdleSignal =
317323
forkP "dispatcher" . forever $ do
318324
listenNotification inbox
325+
-- When awoken, we always start slow, processing only a single job in a
326+
-- batch. Each time we can fill a batch completely with jobs, we grow the maximum
327+
-- batch size.
319328
someJobWasProcessed <- loop 1
320329
if someJobWasProcessed
321330
then setIdle False
@@ -350,9 +359,11 @@ spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mId
350359
. forkP "batch processor"
351360
. (`finally` subtractJobs)
352361
. restore
353-
$ do
354-
mapM startJob batch >>= mapM joinJob >>= updateJobs
362+
$ mapM startJob batch >>= mapM joinJob >>= updateJobs
355363

364+
-- Induce some backpressure. If the number of running jobs by all batch
365+
-- processors exceed the global limit, we wait. If it does not, start a
366+
-- new iteration with a double the limit
356367
when (batchSize == limit) $ do
357368
maxBatchSize <- atomically $ do
358369
jobs <- readTVar runningJobs
@@ -434,6 +445,7 @@ updateJobsQuery
434445
-> [(idx, Result)]
435446
-> UTCTime
436447
-> SQL
448+
{-# INLINEABLE updateJobsQuery #-}
437449
updateJobsQuery jobsTable results now =
438450
smconcat
439451
[ "WITH removed AS ("

consumers/src/Database/PostgreSQL/Consumers/Utils.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import Database.PostgreSQL.PQTypes.SQL.Raw
2626
-- | Run an action 'm' that returns a finalizer and perform the returned
2727
-- finalizer after the action 'action' completes.
2828
finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a
29+
{-# INLINEABLE finalize #-}
2930
finalize m action = do
3031
finalizer <- newEmptyMVar
3132
flip finally (tryTakeMVar finalizer >>= fromMaybe (pure ())) $ do
@@ -53,13 +54,15 @@ instance Exception ThrownFrom
5354

5455
-- | Stop execution of a thread.
5556
stopExecution :: MonadBase IO m => ThreadId -> m ()
57+
{-# INLINEABLE stopExecution #-}
5658
stopExecution = flip throwTo StopExecution
5759

5860
----------------------------------------
5961

6062
-- | Modified version of 'fork' that propagates thrown exceptions to the parent
6163
-- thread.
6264
forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId
65+
{-# INLINEABLE forkP #-}
6366
forkP = forkImpl fork
6467

6568
-- | Modified version of 'TG.fork' that propagates thrown exceptions to the
@@ -70,6 +73,7 @@ gforkP
7073
-> String
7174
-> m ()
7275
-> m (ThreadId, m (T.Result ()))
76+
{-# INLINEABLE gforkP #-}
7377
gforkP = forkImpl . TG.fork
7478

7579
----------------------------------------
@@ -80,6 +84,7 @@ forkImpl
8084
-> String
8185
-> m ()
8286
-> m a
87+
{-# INLINEABLE forkImpl #-}
8388
forkImpl ffork tname m = E.mask $ \release -> do
8489
parent <- myThreadId
8590
ffork $
@@ -98,6 +103,7 @@ newtype TriggerNotification m = TriggerNotification {triggerNotification :: m ()
98103
newtype ListenNotification m = ListenNotification {listenNotification :: m ()}
99104

100105
mkNotification :: MonadBaseControl IO m => m (TriggerNotification m, ListenNotification m)
106+
{-# INLINEABLE mkNotification #-}
101107
mkNotification = do
102108
notificationRef <- newEmptyMVar
103109
pure

0 commit comments

Comments
 (0)