From 9747ea0587310a60e5258f71793fba8ab7a0437e Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Mon, 17 Feb 2025 17:47:25 +0000 Subject: [PATCH 1/3] Bradley's simpler lock, but it breaks in weird ways --- src/ConcurrentUtilities.jl | 3 +- src/simplefifolock.jl | 124 +++++++++++++++++++++++++++++++++++++ test/runtests.jl | 48 ++++++++++++++ 3 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 src/simplefifolock.jl diff --git a/src/ConcurrentUtilities.jl b/src/ConcurrentUtilities.jl index f92505a..b7645b1 100644 --- a/src/ConcurrentUtilities.jl +++ b/src/ConcurrentUtilities.jl @@ -3,7 +3,7 @@ module ConcurrentUtilities import Base: AbstractLock, islocked, trylock, lock, unlock export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn, Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException, - Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock + Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock, SimpleFIFOLock macro samethreadpool_spawn(expr) if VERSION >= v"1.9.2" @@ -23,6 +23,7 @@ include("rwlock.jl") include("pools.jl") using .Pools include("fifolock.jl") +include("simplefifolock.jl") function clear_current_task() current_task().storage = nothing diff --git a/src/simplefifolock.jl b/src/simplefifolock.jl new file mode 100644 index 0000000..0df21b1 --- /dev/null +++ b/src/simplefifolock.jl @@ -0,0 +1,124 @@ +mutable struct SimpleFIFOLock <: AbstractLock + cond_wait::Base.ThreadSynchronizer + reentrancy_count::UInt # 0 iff the lock is not held + locked_by::Union{Task,Nothing} # nothing iff the lock is not held + SimpleFIFOLock() = new(Base.ThreadSynchronizer(), 0, nothing) +end + +@inline function Base.trylock(l::SimpleFIFOLock) + GC.disable_finalizers(c) + ct = current_task() + lock(l.cond_wait) + locked_by = l.locked_by + if locked_by === nothing || locked_by === ct + l.rentrancy_count += 1 + l.locked_by = ct + unlock(l.cond_wait) + return true + end + unlock(l.cond_wait) + GC.enable_finalizers(c) + return false +end + +@inline function Base.lock(l::SimpleFIFOLock) + GC.disable_finalizers() + ct = current_task() + lock(l.cond_wait) + locked_by = l.locked_by + if locked_by === nothing || locked_by == ct + # We unroll the first iteration of the loop to avoid the overyhead of `try-finally` + # in the uncontended lock case. + l.reentrancy_count += 1 + l.locked_by = ct + unlock(l.cond_wait) + else + try + while l.locked_by !== nothing && l.locked_by !== ct + wait(l.cond_wait) + end + l.reentrancy_count += 1 + l.locked_by = ct + finally + unlock(l.cond_wait) + end + end + return nothing +end + +@inline function Base.unlock(l::SimpleFIFOLock) + ct = current_task() + lock(l.cond_wait) + #@info ":$(@__LINE__()) unlocking" task=current_task() locked_by=l.locked_by count=l.reentrancy_count ql=length(l.cond_wait.waitq) + println(@__LINE__()) + println(@__LINE__()) + if l.locked_by !== ct + @info ":$(@__LINE__())" + error("unlock from wrong thread") + end + if l.reentrancy_count == 0 + @info ":$(@__LINE__())" + error("unlock count must equal lock count") + end + @info ":$(@__LINE__())" + l.reentrancy_count -= 1 + @info ":$(@__LINE__())" + @info ":$(@__LINE__()) now count is" task=current_task() count=l.reentrancy_count + if l.reentrancy_count == 0 + @info "count becomes zero" task=current_task() ql=length(l.cond_wait.waitq) + l.locked_by = nothing + if !isempty(l.cond_wait.waitq) + @info "waitq not empty" ql=length(l.cond_wait.waitq) + t = popfirst!(l.cond_wait.waitq) + l.locked_by = t + l.reentrancy_count = 1 + schedule(t) + end + GC.enable_finalizers() + end + unlock(l.cond_wait) + return nothing +end + +# mutable struct FLock2 <: AbstractLock +# lock::ReentrantLock +# locked_by::Union{Task,Nothing} # nothing iff the lock is not held +# rentrancy_count::UInt # 0 iff the lock is not held +# tasks::Vector{Task} +# FLock2() = new(ReentrantLock(), nothing, 0, Task[]) +# end + +# @inline function Base.trylock(l::SimpleFIFOLock) +# GC.disable_finalizers(c) +# ct = current_task() +# lock(l.lock) +# locked_by = l.locked_by +# if locked_by === nothing || locked_by === ct +# l.rentrancy_count += 1 +# l.locked_by = ct +# unlock(l.lock) +# return true +# end +# unlock(l.lock) +# GC.enable_finalizers(c) +# return false +# end + +# @inline function Base.lock(l::SimpleFIFOLock) +# GC.disable_finalizers() +# ct = current_task() +# lock(l.lock) +# locked_by = l.locked_by +# if locked_by === nothing || locked_by == ct +# l.reentrancy_count += 1 +# l.locked_by = ct +# unlock(l.lock) +# else +# push!(l.tasks, ct) +# unlock(l.lock) # Little race here since someone could sneak in and yield to me. +# wait() +# l.reentrancy_count += 1 +# l.locked_by = ct +# end +# return nothing +# end diff --git a/test/runtests.jl b/test/runtests.jl index dba5f19..891991a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -269,6 +269,54 @@ else end # @static if VERSION < v"1.10" end + @testset "SimpleFIFOLock" begin +@static if false # VERSION < v"1.10-" + @warn "skipping FIFOLock tests since VERSION ($VERSION) < v\"1.10\"" +else + @warn "Doing SimpleFIFOLock" + ctr_in = Threads.Atomic{Int}(1) + ctr_out = Threads.Atomic{Int}(1) + test_tasks = Task[] + sizehint!(test_tasks, 16) + tasks_in = zeros(Int, 16) + tasks_out = zeros(Int, 16) + tot = zeros(Int, 1) + fl = SimpleFIFOLock() + lock(fl) + try + for i in 1:16 + t = Threads.@spawn begin + tasks_in[i] = Threads.atomic_add!(ctr_in, 1) + @info "Locking $i" + lock(fl) + try + tot[1] += 1 + tasks_out[i] = Threads.atomic_add!(ctr_out, 1) + finally + unlock(fl) + end + end + push!(test_tasks, t) + end + finally + unlock(fl) + end + println("Mid SimpleFIFOLock") + for t in test_tasks + @test try + wait(t) + true + catch + false + end + end + @test tot[1] == 16 + @test tasks_out == tasks_in + @warn "Did SimpleFIFOLock" +end # @static if VERSION < v"1.10" + end + + # track all workers every created ALL_WORKERS = [] ConcurrentUtilities.Workers.GLOBAL_CALLBACK_PER_WORKER[] = w -> push!(ALL_WORKERS, w) From 0139377f14af3e3e65f3975b5c8aa5df0f8ff6e0 Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Wed, 19 Feb 2025 21:59:30 +0000 Subject: [PATCH 2/3] Simpler version of FIFOLock --- Project.toml | 1 + src/simplefifolock.jl | 156 +++++++++++++++++++----------------------- test/runtests.jl | 1 - 3 files changed, 73 insertions(+), 85 deletions(-) diff --git a/Project.toml b/Project.toml index 826d4bb..a1a1099 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Jacob Quinn "] version = "2.5.0" [deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" diff --git a/src/simplefifolock.jl b/src/simplefifolock.jl index 0df21b1..958a557 100644 --- a/src/simplefifolock.jl +++ b/src/simplefifolock.jl @@ -1,124 +1,112 @@ mutable struct SimpleFIFOLock <: AbstractLock - cond_wait::Base.ThreadSynchronizer + cond::Threads.Condition reentrancy_count::UInt # 0 iff the lock is not held - locked_by::Union{Task,Nothing} # nothing iff the lock is not held - SimpleFIFOLock() = new(Base.ThreadSynchronizer(), 0, nothing) + locked_by::Union{Task,Nothing} # nothing iff the lock is not held + SimpleFIFOLock() = new(Threads.Condition(), 0, nothing) end - + @inline function Base.trylock(l::SimpleFIFOLock) - GC.disable_finalizers(c) + GC.disable_finalizers() ct = current_task() - lock(l.cond_wait) + lock(l.cond) locked_by = l.locked_by if locked_by === nothing || locked_by === ct - l.rentrancy_count += 1 + l.reentrancy_count += 1 l.locked_by = ct - unlock(l.cond_wait) + unlock(l.cond) return true - end - unlock(l.cond_wait) - GC.enable_finalizers(c) - return false + end + unlock(l.cond) + GC.enable_finalizers() + return false end @inline function Base.lock(l::SimpleFIFOLock) GC.disable_finalizers() ct = current_task() - lock(l.cond_wait) - locked_by = l.locked_by - if locked_by === nothing || locked_by == ct - # We unroll the first iteration of the loop to avoid the overyhead of `try-finally` - # in the uncontended lock case. - l.reentrancy_count += 1 - l.locked_by = ct - unlock(l.cond_wait) - else - try - while l.locked_by !== nothing && l.locked_by !== ct - wait(l.cond_wait) - end + lock(l.cond) + while true + if l.locked_by === nothing || l.locked_by === ct l.reentrancy_count += 1 l.locked_by = ct - finally - unlock(l.cond_wait) + unlock(l.cond) + return nothing + end + # Don't pay for the try-catch unless we `wait`. + try + wait(l.cond) + catch + unlock(l.cond) + rethrow() end end - return nothing end @inline function Base.unlock(l::SimpleFIFOLock) ct = current_task() - lock(l.cond_wait) - #@info ":$(@__LINE__()) unlocking" task=current_task() locked_by=l.locked_by count=l.reentrancy_count ql=length(l.cond_wait.waitq) - println(@__LINE__()) - println(@__LINE__()) + lock(l.cond) + if l.locked_by === nothing + unlock(l.cond) + error("unlocking an unlocked lock") + end if l.locked_by !== ct - @info ":$(@__LINE__())" + unlock(l.cond) error("unlock from wrong thread") end + l.reentrancy_count += -1 if l.reentrancy_count == 0 - @info ":$(@__LINE__())" - error("unlock count must equal lock count") - end - @info ":$(@__LINE__())" - l.reentrancy_count -= 1 - @info ":$(@__LINE__())" - @info ":$(@__LINE__()) now count is" task=current_task() count=l.reentrancy_count - if l.reentrancy_count == 0 - @info "count becomes zero" task=current_task() ql=length(l.cond_wait.waitq) l.locked_by = nothing - if !isempty(l.cond_wait.waitq) - @info "waitq not empty" ql=length(l.cond_wait.waitq) - t = popfirst!(l.cond_wait.waitq) - l.locked_by = t - l.reentrancy_count = 1 - schedule(t) + if !isempty(l.cond.waitq) + # Don't pay for the try-catch unless we `notify`. + try + notify(l.cond; all=false) + catch + unlock(l.cond) + rethrow() + end end - GC.enable_finalizers() end - unlock(l.cond_wait) + unlock(l.cond) return nothing end -# mutable struct FLock2 <: AbstractLock -# lock::ReentrantLock -# locked_by::Union{Task,Nothing} # nothing iff the lock is not held -# rentrancy_count::UInt # 0 iff the lock is not held -# tasks::Vector{Task} -# FLock2() = new(ReentrantLock(), nothing, 0, Task[]) -# end +# Performance note: for `@btime begin lock($l); unlock($l); end`. +# 13ns for SpinLock +# 17ns for ReentrantLock +# 33ns for FIFOLock +# 35ns for SimpleFIFOLock +# 57ns for the alternative versions below that use `@lock` (and hence have too much try-finally code) -# @inline function Base.trylock(l::SimpleFIFOLock) -# GC.disable_finalizers(c) +# @inline function Base.lock(l::SimpleFIFOLock) +# GC.disable_finalizers() # ct = current_task() -# lock(l.lock) -# locked_by = l.locked_by -# if locked_by === nothing || locked_by === ct -# l.rentrancy_count += 1 -# l.locked_by = ct -# unlock(l.lock) -# return true +# @lock l.cond begin +# while true +# if l.locked_by === nothing || l.locked_by === ct +# l.reentrancy_count += 1 +# l.locked_by = ct +# return nothing +# end +# wait(l.cond) +# end # end -# unlock(l.lock) -# GC.enable_finalizers(c) -# return false # end - -# @inline function Base.lock(l::SimpleFIFOLock) -# GC.disable_finalizers() + +# @inline function Base.unlock(l::SimpleFIFOLock) # ct = current_task() -# lock(l.lock) -# locked_by = l.locked_by -# if locked_by === nothing || locked_by == ct -# l.reentrancy_count += 1 -# l.locked_by = ct -# unlock(l.lock) -# else -# push!(l.tasks, ct) -# unlock(l.lock) # Little race here since someone could sneak in and yield to me. -# wait() -# l.reentrancy_count += 1 -# l.locked_by = ct +# @lock l.cond begin +# if l.locked_by === nothing +# error("unlocking an unlocked lock") +# end +# if l.locked_by !== ct +# error("unlock from wrong thread") +# end +# @assert l.reentrancy_count > 0 +# l.reentrancy_count += -1 +# if l.reentrancy_count == 0 +# l.locked_by = nothing +# notify(l.cond; all=false) +# end # end # return nothing # end diff --git a/test/runtests.jl b/test/runtests.jl index 891991a..265e83f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -287,7 +287,6 @@ else for i in 1:16 t = Threads.@spawn begin tasks_in[i] = Threads.atomic_add!(ctr_in, 1) - @info "Locking $i" lock(fl) try tot[1] += 1 From d56426e93bffd010eddd875feee493321e205cb9 Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Wed, 19 Feb 2025 22:22:47 +0000 Subject: [PATCH 3/3] Add documentation --- src/simplefifolock.jl | 44 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/simplefifolock.jl b/src/simplefifolock.jl index 958a557..de765a1 100644 --- a/src/simplefifolock.jl +++ b/src/simplefifolock.jl @@ -1,3 +1,19 @@ +""" + SimpleFIFOLock() + +A reentrant lock similar to Base.ReentrantLock, but with strict FIFO ordering. + +This lock supports "barging", in which a thread can jump to the front of the queue of tasks: +``` +lock(fifolock; first=true) +``` + +Calling `lock` inhibits running on finalizers on that thread. + +Implementation note: The implementation uses a Condition. Conditions provide FIFO +notification order, as well as the ability to jump to the front, which makes the +implementation straightforward. +""" mutable struct SimpleFIFOLock <: AbstractLock cond::Threads.Condition reentrancy_count::UInt # 0 iff the lock is not held @@ -5,6 +21,14 @@ mutable struct SimpleFIFOLock <: AbstractLock SimpleFIFOLock() = new(Threads.Condition(), 0, nothing) end +""" + trylock(l::SimpleFIFOLock) + +Try to acquire lock `l`. If successful, return `true`. If the lock is held by another +task, do not wait and return `false`. + +Each successful `trylock` must be matched by an `unlock`. +""" @inline function Base.trylock(l::SimpleFIFOLock) GC.disable_finalizers() ct = current_task() @@ -21,7 +45,15 @@ end return false end -@inline function Base.lock(l::SimpleFIFOLock) +""" + lock(l::SimpleFIFOLock; first=false) + +Acquire lock `l`. The lock is reentrant, so if the calling task has already acquired the +lock then return immediately. + +Each `lock` must be matched by an `unlock`. +""" +@inline function Base.lock(l::SimpleFIFOLock; first=false) GC.disable_finalizers() ct = current_task() lock(l.cond) @@ -34,7 +66,7 @@ end end # Don't pay for the try-catch unless we `wait`. try - wait(l.cond) + wait(l.cond; first) catch unlock(l.cond) rethrow() @@ -42,6 +74,14 @@ end end end +""" + unlock(lock::SimpleFIFOLock) + +Releases ownerhsip of `lock`. + +Note if the has been more than once by the same thread, it will need to be unlocked the same +number of times. +""" @inline function Base.unlock(l::SimpleFIFOLock) ct = current_task() lock(l.cond)