Skip to content

Commit

Permalink
now cleaning up conditions even if we get interrupted or error out
Browse files Browse the repository at this point in the history
  • Loading branch information
ssfrr committed May 24, 2017
1 parent 9ec268f commit ae4531f
Showing 1 changed file with 72 additions and 69 deletions.
141 changes: 72 additions & 69 deletions src/RingBuffers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,37 @@ function write(rbuf::RingBuffer{T}, data::AbstractArray{T}, nframes) where {T}
isopen(rbuf) || return 0

cond = Condition()
push!(rbuf.writers, cond)
if length(rbuf.writers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return 0
end
# now we're in the front of the queue
nwritten = 0
n = PaUtil_WriteRingBuffer(rbuf.pabuf,
pointer(data),
nframes)
nwritten += n
# notify any waiting readers that there's data available
notify(rbuf.datanotify.cond)
while nwritten < nframes
wait(rbuf.datanotify)
isopen(rbuf) || return nwritten
try
push!(rbuf.writers, cond)
if length(rbuf.writers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return 0
end
# now we're in the front of the queue
nwritten = 0
n = PaUtil_WriteRingBuffer(rbuf.pabuf,
pointer(data)+(nwritten*rbuf.nchannels*sizeof(T)),
nframes-nwritten)
pointer(data),
nframes)
nwritten += n
# notify any waiting readers that there's data available
notify(rbuf.datanotify.cond)
end
# we're done, remove our condition and notify the next writer if necessary
shift!(rbuf.writers)
if length(rbuf.writers) > 0
notify(rbuf.writers[1])
while nwritten < nframes
wait(rbuf.datanotify)
isopen(rbuf) || return nwritten
n = PaUtil_WriteRingBuffer(rbuf.pabuf,
pointer(data)+(nwritten*rbuf.nchannels*sizeof(T)),
nframes-nwritten)
nwritten += n
# notify any waiting readers that there's data available
notify(rbuf.datanotify.cond)
end
finally
# we're done, remove our condition and notify the next writer if necessary
shift!(rbuf.writers)
if length(rbuf.writers) > 0
notify(rbuf.writers[1])
end
end

nwritten
Expand Down Expand Up @@ -149,22 +152,25 @@ function flush(rbuf::RingBuffer)
isopen(rbuf) || return

cond = Condition()
push!(rbuf.writers, cond)
if length(rbuf.writers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return
end
# now we're in the front of the queue
while frameswritable(rbuf) < rbuf.pabuf.bufferSize
wait(rbuf.datanotify)
isopen(rbuf) || return
end
try
push!(rbuf.writers, cond)
if length(rbuf.writers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return
end
# now we're in the front of the queue
while frameswritable(rbuf) < rbuf.pabuf.bufferSize
wait(rbuf.datanotify)
isopen(rbuf) || return
end

# we're done, remove our condition and notify the next writer if necessary
shift!(rbuf.writers)
if length(rbuf.writers) > 0
notify(rbuf.writers[1])
finally
# we're done, remove our condition and notify the next writer if necessary
shift!(rbuf.writers)
if length(rbuf.writers) > 0
notify(rbuf.writers[1])
end
end
end

Expand All @@ -191,34 +197,37 @@ function read!(rbuf::RingBuffer{T}, data::AbstractArray{T}, nframes) where {T}
end

cond = Condition()
push!(rbuf.readers, cond)
if length(rbuf.readers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return 0
end
# now we're in the front of the queue
nread = 0
n = PaUtil_ReadRingBuffer(rbuf.pabuf,
pointer(data),
nframes)
nread += n
# notify any waiting writers that there's space available
notify(rbuf.datanotify.cond)
while nread < nframes
wait(rbuf.datanotify)
isopen(rbuf) || return nread
try
push!(rbuf.readers, cond)
if length(rbuf.readers) > 1
# we're behind someone in the queue
wait(cond)
isopen(rbuf) || return 0
end
# now we're in the front of the queue
nread = 0
n = PaUtil_ReadRingBuffer(rbuf.pabuf,
pointer(data)+(nread*rbuf.nchannels*sizeof(T)),
nframes-nread)
pointer(data),
nframes)
nread += n
# notify any waiting writers that there's space available
notify(rbuf.datanotify.cond)
end
# we're done, remove our condition and notify the next reader if necessary
shift!(rbuf.readers)
if length(rbuf.readers) > 0
notify(rbuf.readers[1])
while nread < nframes
wait(rbuf.datanotify)
isopen(rbuf) || return nread
n = PaUtil_ReadRingBuffer(rbuf.pabuf,
pointer(data)+(nread*rbuf.nchannels*sizeof(T)),
nframes-nread)
nread += n
# notify any waiting writers that there's space available
notify(rbuf.datanotify.cond)
end
finally
# we're done, remove our condition and notify the next reader if necessary
shift!(rbuf.readers)
if length(rbuf.readers) > 0
notify(rbuf.readers[1])
end
end

nread
Expand Down Expand Up @@ -332,12 +341,6 @@ function close(rbuf::RingBuffer)
close(rbuf.pabuf)
# wake up any waiting readers or writers
notify(rbuf.datanotify.cond)
for condlist in (rbuf.readers, rbuf.writers)
while length(condlist) > 0
cond = pop!(condlist)
notify(cond)
end
end
end

isopen(rbuf::RingBuffer) = isopen(rbuf.pabuf)
Expand Down

0 comments on commit ae4531f

Please sign in to comment.