13
13
-----------------------------------------------------------------------------
14
14
{-# LANGUAGE ScopedTypeVariables, CPP #-}
15
15
module Transient.Indeterminism (
16
- choose , choose' , chooseStream , collect , collect' , group , groupByTime
16
+ choose , choose' , chooseStream , collect , collect' , group , groupByTime , burst
17
17
) where
18
18
19
19
import Transient.Internals hiding (retry )
20
20
21
21
import Data.IORef
22
22
import Control.Applicative
23
23
import Data.Monoid
24
- import Control.Concurrent
24
+ import Control.Concurrent
25
25
import Data.Typeable
26
26
import Control.Monad.State
27
27
import GHC.Conc
28
28
import Data.Time.Clock
29
29
import Control.Exception
30
30
31
- #ifndef ETA_VERSION
32
- import Data.Atomics
33
- #endif
31
+
34
32
35
33
36
34
-- | Converts a list of pure values into a transient task set. You can use the
@@ -46,7 +44,7 @@ chooseStream []= empty
46
44
chooseStream xs = do
47
45
evs <- liftIO $ newIORef xs
48
46
parallel $ do
49
- es <- atomicModifyIORefCAS evs $ \ es -> let tes= tail es in (tes,es)
47
+ es <- atomicModifyIORef evs $ \ es -> let tes= tail es in (tes,es)
50
48
case es of
51
49
[x] -> x `seq` return $ SLast x
52
50
x: _ -> x `seq` return $ SMore x
@@ -65,7 +63,7 @@ group num proc = do
65
63
v <- liftIO $ newIORef (0 ,[] )
66
64
x <- proc
67
65
68
- mn <- liftIO $ atomicModifyIORefCAS v $ \ (n,xs) ->
66
+ mn <- liftIO $ atomicModifyIORef v $ \ (n,xs) ->
69
67
let n'= n + 1
70
68
in if n'== num
71
69
@@ -87,7 +85,7 @@ groupByTime1 time proc = do
87
85
88
86
x <- proc
89
87
t' <- liftIO getCurrentTime
90
- mn <- liftIO $ atomicModifyIORefCAS v $ \(n,t,xs) -> let n'=n +1
88
+ mn <- liftIO $ atomicModifyIORef v $ \(n,t,xs) -> let n'=n +1
91
89
in
92
90
if diffUTCTime t' t < fromIntegral time
93
91
then ((n',t, x:xs),Nothing)
@@ -112,7 +110,7 @@ collect n = collect' n 0
112
110
--
113
111
collect' :: Int -> Int -> TransIO a -> TransIO [a ]
114
112
collect' n t search= do
115
- addThreads 1
113
+
116
114
117
115
rv <- liftIO $ newEmptyMVar -- !> "NEWMVAR"
118
116
@@ -124,7 +122,9 @@ collect' n t search= do
124
122
stop
125
123
126
124
timer= do
127
- when (t > 0 ) . async $ threadDelay t >> putMVar rv Nothing
125
+ when (t > 0 ) $ do
126
+ addThreads 1
127
+ async $ threadDelay t >> putMVar rv Nothing
128
128
empty
129
129
130
130
monitor= liftIO loop
@@ -137,7 +137,7 @@ collect' n t search= do
137
137
case mr of
138
138
Nothing -> return rs
139
139
Just r -> do
140
- let n''= n' + 1
140
+ let n''= n' + 1
141
141
let rs'= r: rs
142
142
writeIORef results (n'',rs')
143
143
@@ -149,10 +149,10 @@ collect' n t search= do
149
149
readIORef results >>= return . snd
150
150
151
151
152
- oneThread $ timer <|> worker <|> monitor
152
+ oneThread $ timer <|> worker <|> monitor
153
153
154
154
155
- -- | insert `SDone` response everytime there is a timeout since the last response
155
+ -- | insert `SDone` response every time there is a timeout since the last response
156
156
157
157
burst :: Int -> TransIO a -> TransIO (StreamData a )
158
158
burst timeout comp= do
@@ -166,12 +166,12 @@ groupByTime timeout comp= do
166
166
where
167
167
run v = do
168
168
x <- comp
169
- liftIO $ atomicModifyIORefCAS v $ \ xs -> (xs <> x,() )
169
+ liftIO $ atomicModifyIORef v $ \ xs -> (xs <> x,() )
170
170
empty
171
171
172
172
gather v= waitEvents $ do
173
173
threadDelay timeout
174
- atomicModifyIORefCAS v $ \ xs -> (mempty , xs)
174
+ atomicModifyIORef v $ \ xs -> (mempty , xs)
175
175
176
176
177
177
0 commit comments