jli  Linuxx86_641.10.3v1.10.30b4590a5507d3f3046e5bafc007cacbbfc9b310bV"ConcurrentUtilitiesDYQJk𻈹R>#\"6H/opt/julia/packages/ConcurrentUtilities/J6iMP/src/ConcurrentUtilities.jldAE/opt/julia/packages/ConcurrentUtilities/J6iMP/src/try_with_timeout.jldA</opt/julia/packages/ConcurrentUtilities/J6iMP/src/workers.jldAސݗ1V$ bdSocketsWorkersk1 [)*SerializationWorkers=/opt/julia/packages/ConcurrentUtilities/J6iMP/src/lockable.jldA:/opt/julia/packages/ConcurrentUtilities/J6iMP/src/spawn.jldAA/opt/julia/packages/ConcurrentUtilities/J6iMP/src/synchronizer.jldA;/opt/julia/packages/ConcurrentUtilities/J6iMP/src/rwlock.jldA:/opt/julia/packages/ConcurrentUtilities/J6iMP/src/pools.jldA* CoremуJ5Basemу]J5MainmуJ5ArgToolsBń x(mуF K5 Artifactsmr-V3|mу K5Base64UlD*_mу> K5CRC32c\y.jmуj K5 FileWatchingXzsy`{,zmуh& K5LibdluVW59˗,mу-" K5LoggingT{VhUXM=mуrU" K5MmapP~:xg,Omу|' K5NetworkOptionsC0YW,mуʠ, K5SHAQ<$!<%mу1 K5 Serialization [)*k1mу-G K5Sockets1V$ bdސݗmуYBY K5UnicodeP>I>Nrmуeszo K5 LinearAlgebraSm7̏mуuux K5 OpenBLAS_jll[(Śb6EcQ FmуDux K5libblastrampoline_jllLSۆ }lxӠmу^} K5MarkdownZPn7z`smу/Ed~ K5Printfg^cX׸QDmу;h K5Random_ɢ?\Ymу? K5TarOi>աmу!t, K5DatesEY8pj2 mуX K5FuturebS;3{I xVMmуsD K5InteractiveUtilsWL ~@'ZmуVg K5LibGit2Z[&RPTv3EКRmу8J K5 LibGit2_jll YXg}]$mуD K5 MbedTLS_jllAX 3ȡ_mу- K5 LibSSH2_jlloTZk)߆= v"1.9.2" esc(:(Threads.@spawn Threads.threadpool() $expr)) else esc(:(Threads.@spawn $expr)) end end include("try_with_timeout.jl") include("workers.jl") using .Workers isdefined(Base, :Lockable) ? (using Base: Lockable) : include("lockable.jl") # https://github.com/JuliaLang/julia/pull/52898 include("spawn.jl") include("synchronizer.jl") include("rwlock.jl") include("pools.jl") using .Pools function clear_current_task() current_task().storage = nothing current_task().code = nothing return end """ @wkspawn [:default|:interactive] expr Create a `Task` and `schedule` it to run on any available thread in the specified threadpool (`:default` if unspecified). The task is allocated to a thread once one becomes available. To wait for the task to finish, call `wait` on the result of this macro, or call `fetch` to wait and then obtain its return value. Values can be interpolated into `@wkspawn` via `\$`, which copies the value directly into the constructed underlying closure. This allows you to insert the _value_ of a variable, isolating the asynchronous code from changes to the variable's value in the current task. """ macro wkspawn(args...) e = args[end] expr = quote ret = $e $(clear_current_task)() ret end @static if isdefined(Base.Threads, :maxthreadid) q = esc(:(Threads.@spawn $(args[1:end-1]...) $expr)) else q = esc(:(Threads.@spawn $expr)) end return q end end # module E/opt/julia/packages/ConcurrentUtilities/J6iMP/src/try_with_timeout.jl9 """ TimeoutException Thrown from `try_with_timeout` when the timeout is reached. """ struct TimeoutException <: Exception timeout::Float64 end function Base.showerror(io::IO, te::TimeoutException) print(io, "TimeoutException: try_with_timeout timed out after $(te.timeout) seconds") end """ TimedOut Helper object passed to user-provided `f` in `try_with_timeout` that allows checking if the calling context reached a time out. Call `x[]`, which returns a `Bool`, to check if the timeout was reached. """ struct TimedOut{T} ch::Channel{T} end Base.getindex(x::TimedOut) = !isopen(x.ch) """ try_with_timeout(f, timeout, T=Any) -> T Run `f` in a new task, and return its result. If `f` does not complete within `timeout` seconds, throw a `TimeoutException`. If `f` throws an exception, rethrow it. If `f` completes successfully, return its result. `f` should be of the form `f(x::TimedOut)`, where `x` is a `TimedOut` object. This allows the calling function to check whether the timeout has been reached by checking `x[]` and if `true`, the timeout was reached and the function can cancel/abort gracefully. The 3rd argument `T` is optional (default `Any`) and allows passing an expected return type that `f` should return; this allows avoiding a dynamic dispatch from non-inferability of using `try_with_timeout` with `f`. # Examples ```julia julia> try_with_timeout(_ -> 1, 1) 1 julia> try_with_timeout(_ -> sleep(3), 1) ERROR: TimeoutException: try_with_timeout timed out after 1.0 seconds Stacktrace: [1] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1 [2] top-level scope at REPL[2]:1 julia> try_with_timeout(_ -> error("hey"), 1) ERROR: hey Stacktrace: [1] error(::String) at ./error.jl:33 [2] (::var"#1#2")(::TimedOut{Any}) at ./REPL[1]:1 [3] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1 [4] top-level scope at REPL[3]:1 julia> try_with_timeout(_ -> 1, 1, Int) 1 # usage with `TimedOut` julia> try_with_timeout(1) do timedout while !timedout[] # do iterative computation that may take too long end end julia> try_with_timeout(1) do timedout sleep(3) timedout[] && abort_gracefully() end ``` """ function try_with_timeout(f, timeout, ::Type{T}=Any) where {T} ch = Channel{T}(0) x = TimedOut(ch) timer = Timer(tm -> !isready(ch) && close(ch, TimeoutException(timeout)), timeout) @samethreadpool_spawn begin try put!(ch, $f(x)::T) catch e close(ch, CapturedException(e, catch_backtrace())) finally close(timer) end end return take!(ch) end </opt/julia/packages/ConcurrentUtilities/J6iMP/src/workers.jl.module Workers using Sockets, Serialization export Worker, remote_eval, remote_fetch, terminate!, WorkerTerminatedException import ..try_with_timeout # RPC framework struct Request mod::Symbol # module in which we should eval expr::Expr # expression to eval id::UInt64 # unique id for this request # if true, worker should terminate immediately after receiving this Request # ignoring other fields shutdown::Bool end # worker executes Request and returns a serialized Response object *if* Request wasn't a shutdown struct Response result error::Union{Nothing, Exception} id::UInt64 # matches a corresponding Request.id end # simple FutureResult that coordinator can wait on until a Response comes back for a Request struct FutureResult id::UInt64 # matches a corresponding Request.id value::Channel{Any} # size 1 end Base.fetch(f::FutureResult) = fetch(f.value) mutable struct Worker lock::ReentrantLock # protects the .futures field; no other fields are modified after construction pid::Int process::Base.Process server::Sockets.PipeServer pipe::Base.PipeEndpoint messages::Task output::Task process_watch::Task worksubmission::Task workqueue::Channel{Tuple{Request, FutureResult}} futures::Dict{UInt64, FutureResult} # Request.id -> FutureResult @static if VERSION < v"1.7" terminated::Threads.Atomic{Bool} else @atomic terminated::Bool end end # used to close FutureResult.value channels when a worker terminates struct WorkerTerminatedException <: Exception worker::Worker end # atomics compat macro atomiccas(ex, cmp, val) @static if VERSION < v"1.7" return esc(quote _ret = Threads.atomic_cas!($ex, $cmp, $val) (; success=(_ret === $cmp)) end) else return esc(:(@atomicreplace $ex $cmp => $val)) end end macro atomicget(ex) @static if VERSION < v"1.7" return esc(Expr(:ref, ex)) else return esc(:(@atomic :acquire $ex)) end end terminated(w::Worker) = @atomicget(w.terminated) # performs all the "closing" tasks of worker fields # but does not *wait* for a final close state # so typically callers should call wait(w) after this function terminate!(w::Worker, from::Symbol=:manual) if @atomiccas(w.terminated, false, true).success # we won getting to close down the worker @debug "terminating worker $(w.pid) from $from" wte = WorkerTerminatedException(w) Base.@lock w.lock begin for (_, fut) in w.futures close(fut.value, wte) end empty!(w.futures) end signal = Base.SIGTERM while true kill(w.process, signal) signal = signal == Base.SIGTERM ? Base.SIGINT : Base.SIGKILL process_exited(w.process) && break sleep(0.1) process_exited(w.process) && break end close(w.pipe) close(w.server) end return end # Base.Process has a nifty .exitnotify Condition # so we might as well get notified when the process exits # as one of our ways of detecting the worker has gone away function watch_and_terminate!(w::Worker) wait(w.process) terminate!(w, :watch_and_terminate) true end # gracefully terminate a worker by sending a shutdown message # and waiting for the other tasks to perform worker shutdown function Base.close(w::Worker) if !terminated(w) && isopen(w.pipe) req = Request(Symbol(), :(), rand(UInt64), true) Base.@lock w.lock begin serialize(w.pipe, req) end end wait(w) return end # wait until our spawned tasks have all finished Base.wait(w::Worker) = fetch(w.process_watch) && fetch(w.messages) && fetch(w.output) Base.show(io::IO, w::Worker) = print(io, "Worker(pid=$(w.pid)", terminated(w) ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")") # used in testing to ensure all created workers are # eventually cleaned up properly const GLOBAL_CALLBACK_PER_WORKER = Ref{Any}() function Worker(; env::AbstractDict=ENV, dir::String=pwd(), threads::String="auto", exeflags=`--threads=$threads`, connect_timeout::Int=60, worker_redirect_io::IO=stdout, worker_redirect_fn=(io, pid, line)->println(io, " Worker $pid: $line") ) # below copied from Distributed.launch env = Dict{String, String}(env) pathsep = Sys.iswindows() ? ";" : ":" if get(env, "JULIA_LOAD_PATH", nothing) === nothing env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep) end if get(env, "JULIA_DEPOT_PATH", nothing) === nothing env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep) end # Set the active project on workers using JULIA_PROJECT. # Users can opt-out of this by (i) passing `env = ...` or (ii) passing # `--project=...` as `exeflags` to addprocs(...). project = Base.ACTIVE_PROJECT[] if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing env["JULIA_PROJECT"] = project end # end copied from Distributed.launch ## start the worker process file = tempname() server = Sockets.listen(file) color = get(worker_redirect_io, :color, false) ? "yes" : "no" # respect color of target io exec = "include(\"$(@__DIR__)/ConcurrentUtilities.jl\"); using ConcurrentUtilities: Workers; Workers.startworker(\"$file\")" cmd = `$(Base.julia_cmd()) $exeflags --startup-file=no --color=$color -e $exec` proc = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+") pid = Libc.getpid(proc) ## connect to the worker process with timeout try pipe = try_with_timeout(x -> Sockets.accept(server), connect_timeout) # create worker @static if VERSION < v"1.7" w = Worker(ReentrantLock(), pid, proc, server, pipe, Task(nothing), Task(nothing), Task(nothing), Task(nothing), Channel{Tuple{Request, FutureResult}}(), Dict{UInt64, FutureResult}(), Threads.Atomic{Bool}(false)) else w = Worker(ReentrantLock(), pid, proc, server, pipe, Task(nothing), Task(nothing), Task(nothing), Task(nothing), Channel{Tuple{Request, FutureResult}}(), Dict{UInt64, FutureResult}(), false) end ## start a task to watch for worker process termination w.process_watch = Threads.@spawn watch_and_terminate!(w) ## start a task to redirect worker output w.output = Threads.@spawn redirect_worker_output(worker_redirect_io, w, worker_redirect_fn, proc) ## start a task to listen for worker messages w.messages = Threads.@spawn process_responses(w) ## start a task to process eval requests and send to worker w.worksubmission = Threads.@spawn process_work(w) # add a finalizer finalizer(x -> @async(terminate!(x, :finalizer)), w) # @async to allow a task switch if isassigned(GLOBAL_CALLBACK_PER_WORKER) GLOBAL_CALLBACK_PER_WORKER[](w) end return w catch # cleanup in case connect fails/times out kill(proc, Base.SIGKILL) @isdefined(sock) && close(sock) @isdefined(w) && terminate!(w, :Worker_catch) rethrow() end end function redirect_worker_output(io::IO, w::Worker, fn, proc) try while !process_exited(proc) && !@atomicget(w.terminated) line = readline(proc) if !isempty(line) fn(io, w.pid, line) flush(io) end end catch e # @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace()) terminate!(w, :redirect_worker_output) e isa EOFError || e isa Base.IOError || rethrow() end true end function process_responses(w::Worker) lock = w.lock reqs = w.futures try while isopen(w.pipe) && !@atomicget(w.terminated) # get the next Response from the worker r = deserialize(w.pipe) @assert r isa Response "Received invalid response from worker $(w.pid): $(r)" # println("Received response $(r) from worker $(w.pid)") Base.@lock lock begin # look up the FutureResult for this request fut = pop!(reqs, r.id) @assert !isready(fut.value) "Received duplicate response for request $(r.id) from worker $(w.pid)" if r.error !== nothing # this allows rethrowing the exception from the worker to the caller close(fut.value, r.error) else put!(fut.value, r.result) end end end catch e # @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace()) terminate!(w, :process_responses) e isa EOFError || e isa Base.IOError || rethrow() end true end function process_work(w::Worker) try for (req, fut) in w.workqueue # println("Sending request $(req) to worker $(w.pid)") Base.@lock w.lock begin w.futures[req.id] = fut end serialize(w.pipe, req) end catch e # @error "Error processing work for worker $(w.pid)" exception=(e, catch_backtrace()) terminate!(w, :process_work) # e isa EOFError || e isa Base.IOError || rethrow() end end remote_eval(w::Worker, expr) = remote_eval(w, Main, expr.head == :block ? Expr(:toplevel, expr.args...) : expr) function remote_eval(w::Worker, mod, expr) terminated(w) && throw(WorkerTerminatedException(w)) # we only send the Symbol module name to the worker req = Request(nameof(mod), expr, rand(UInt64), false) fut = FutureResult(req.id, Channel(1)) put!(w.workqueue, (req, fut)) return fut end # convenience call to eval and fetch in one step remote_fetch(w::Worker, args...) = fetch(remote_eval(w, args...)) # compat for `Threads.@spawn :interactive expr` @static if hasmethod(getfield(Threads, Symbol("@spawn")), Tuple{LineNumberNode, Module, Symbol, Expr}) macro _spawn_interactive(ex) esc(:(Threads.@spawn :interactive $ex)) end else macro _spawn_interactive(ex) esc(:(@async $ex)) end end function startworker(file) # don't need stdin (copied from Distributed.start_worker) redirect_stdin(devnull) close(stdin) redirect_stderr(stdout) # redirect stderr so coordinator reads everything from stdout pipe = Sockets.connect(file) try wait(@_spawn_interactive serve_requests(pipe)) catch e @error "Error serving requests from coordinator" exception=(e, catch_backtrace()) finally close(pipe) exit(0) end end # we need to lookup the module to eval in for this request # so we loop through loaded modules until we find it function getmodule(nm::Symbol) # fast-path Main/Base/Core nm == :Main && return Main nm == :Base && return Base nm == :Core && return Core for mod in Base.loaded_modules_array() if nameof(mod) == nm return mod end end error("module $nm not found") end function execute(r::Request) # @show r.mod, r.expr return Core.eval(getmodule(r.mod), r.expr) end function serve_requests(io) iolock = ReentrantLock() while true req = deserialize(io) @assert req isa Request req.shutdown && break # println("received request: $(req)") Threads.@spawn begin r = $req local resp try result = execute(r) resp = Response(result, nothing, r.id) catch e resp = Response(nothing, CapturedException(e, catch_backtrace()), r.id) finally Base.@lock iolock begin # println("sending response: $(resp)") serialize(io, resp) end end end yield() end end end # module Workers =/opt/julia/packages/ConcurrentUtilities/J6iMP/src/lockable.jl """ Lockable(value, lock = ReentrantLock()) Creates a `Lockable` object that wraps `value` and associates it with the provided `lock`. """ struct Lockable{T, L <: Base.AbstractLock} value::T lock::L end Lockable(value) = Lockable(value, ReentrantLock()) Base.getindex(l::Lockable) = l.value """ lock(f::Function, l::Lockable) Acquire the lock associated with `l`, execute `f` with the lock held, and release the lock when `f` returns. `f` will receive one positional argument: the value wrapped by `l`. If the lock is already locked by a different task/thread, wait for it to become available. When this function returns, the `lock` has been released, so the caller should not attempt to `unlock` it. """ function Base.lock(f, l::Lockable) lock(l.lock) do f(l.value) end end # implement the rest of the Lock interface on Lockable Base.islocked(l::Lockable) = islocked(l.lock) Base.lock(l::Lockable) = lock(l.lock) Base.trylock(l::Lockable) = trylock(l.lock) Base.unlock(l::Lockable) = unlock(l.lock):/opt/julia/packages/ConcurrentUtilities/J6iMP/src/spawn.jl const WORK_QUEUE = Channel{Task}(0) const WORKER_TASKS = Task[] """ ConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the `current_task()` should be "passed through" to the spawned task. """ macro spawn(thunk) esc(quote tsk = @task $thunk tsk.storage = current_task().storage put!(ConcurrentUtilities.WORK_QUEUE, tsk) tsk end) end """ ConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr Similar to `Threads.@spawn`, schedule and execute a task (given by `expr`) that will be run on a "background worker" (see [`ConcurrentUtilities.init`]((@ref))). In the 2-argument invocation, `passthroughstorage` controls whether the task-local storage of the `current_task()` should be "passed through" to the spawned task. """ macro spawn(passthroughstorage, thunk) esc(quote tsk = @task $thunk if $passthroughstorage tsk.storage = current_task().storage end put!(ConcurrentUtilities.WORK_QUEUE, tsk) tsk end) end """ ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1) Initialize background workers that will execute tasks spawned via [`ConcurrentUtilities.@spawn`](@ref). If `nworkers == 1`, a single worker will be started on thread 1 where tasks will be executed in contention with other thread 1 work. Background worker tasks can be inspected by looking at `ConcurrentUtilities.WORKER_TASKS`. """ function init(nworkers=Threads.nthreads()-1) maxthreadid = nworkers + 1 tids = Threads.nthreads() == 1 ? (1:1) : 2:maxthreadid resize!(WORKER_TASKS, max(nworkers, 1)) @static if VERSION < v"1.8.0" Threads.@threads for tid in 1:maxthreadid if tid in tids WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin for task in WORK_QUEUE schedule(task) wait(task) end end end end else Threads.@threads :static for tid in 1:maxthreadid if tid in tids WORKER_TASKS[tid == 1 ? 1 : (tid - 1)] = Base.@async begin for task in WORK_QUEUE schedule(task) wait(task) end end end end end return endA/opt/julia/packages/ConcurrentUtilities/J6iMP/src/synchronizer.jl """ OrderedSynchronizer(i=1) A threadsafe synchronizer that allows ensuring concurrent work is done in a specific order. The `OrderedSynchronizer` is initialized with an integer `i` that represents the current "order" of the synchronizer. Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function that will be called like `f()` when the synchronizer is at order `i`, and will otherwise wait until other calls to `put!` have finished to bring the synchronizer's state to `i`. Once `f()` is called, the synchronizer's state is incremented by 1 and any waiting `put!` calls check to see if it's their turn to execute. A synchronizer's state can be reset to a specific value (1 by default) by calling `reset!(x, i)`. """ mutable struct OrderedSynchronizer coordinating_task::Task cond::Threads.Condition i::Int @static if VERSION < v"1.7" closed::Threads.Atomic{Bool} else @atomic closed::Bool end end @static if VERSION < v"1.7" OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false)) else OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false) end """ reset!(x::OrderedSynchronizer, i=1) Reset the state of `x` to `i`. """ function reset!(x::OrderedSynchronizer, i=1) Base.@lock x.cond begin x.i = i @static if VERSION < v"1.7" x.closed[] = false else @atomic :monotonic x.closed = false end end end function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception()) Base.@lock x.cond begin @static if VERSION < v"1.7" x.closed[] = true else @atomic :monotonic x.closed = true end Base.notify_error(x.cond, excp) end return end @static if VERSION < v"1.7" Base.isopen(x::OrderedSynchronizer) = !x.closed[] else Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed) end closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed) function check_closed(x::OrderedSynchronizer) if !isopen(x) # if the monotonic load succeed, now do an acquire fence @static if VERSION < v"1.7" !x.closed[] && Base.concurrency_violation() else !(@atomic :acquire x.closed) && Base.concurrency_violation() end throw(closed_exception()) end end """ put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1) Schedule `f` to be called when `x` is at order `i`. Note that `put!` will block until `f` is executed. The typical usage involves something like: ```julia x = OrderedSynchronizer() @sync for i = 1:N Threads.@spawn begin # do some concurrent work # once work is done, schedule synchronization put!(x, \$i) do # report back result of concurrent work # won't be executed until all `i-1` calls to `put!` have already finished end end end ``` The `incr` argument controls how much the synchronizer's state is incremented after `f` is called. By default, `incr` is 1. """ function Base.put!(f, x::OrderedSynchronizer, i, incr=1) check_closed(x) Base.@lock x.cond begin # wait until we're ready to execute f while x.i != i check_closed(x) wait(x.cond) end check_closed(x) try f() catch e Base.throwto(x.coordinating_task, e) end x.i += incr notify(x.cond) end end ;/opt/julia/packages/ConcurrentUtilities/J6iMP/src/rwlock.jl@static if VERSION >= v"1.8" """ ReadWriteLock() A threadsafe lock that allows multiple readers or a single writer. The read side is acquired/released via `readlock(rw)` and `readunlock(rw)`, while the write side is acquired/released via `lock(rw)` and `unlock(rw)`. While a writer is active, all readers will block. Once the writer is finished, all pending readers will be allowed to acquire/release before the next writer. """ mutable struct ReadWriteLock writelock::ReentrantLock readwait::Threads.Condition writeready::Threads.Event # `readercount` keeps track of how many readers are active or pending # if > 0, then that's the number of active readers # if < 0, then there is a writer active or pending, and # `readercount + MaxReaders` is the number of active or pending readers @atomic readercount::Int @atomic readerwait::Int end function ReadWriteLock() @static if VERSION < v"1.8" throw(ArgumentError("ReadWriteLock requires Julia v1.8 or greater")) else return ReadWriteLock(ReentrantLock(), Threads.Condition(), Threads.Event(true), 0, 0) end end const MaxReaders = 1 << 30 function readlock(rw::ReadWriteLock) # first step is to increment readercount atomically if (@atomic :acquire_release rw.readercount += 1) < 0 # if we observe from our atomic operation that readercount is < 0, # a writer was active or pending, so we need initiate the "slowpath" # by acquiring the readwait lock Base.@lock rw.readwait begin # check our condition again, if it holds, then we get in line # to be notified once the writer is done (the writer also acquires # the readwait lock, so we'll be waiting until it releases it) if rw.readercount < 0 # writer still active wait(rw.readwait) end end end return end function readunlock(rw::ReadWriteLock) # similar to `readlock`, the first step is to decrement `readercount` atomically if (@atomic :acquire_release rw.readercount -= 1) < 0 # if we observe from our atomic operation that readercount is < 0, # there's a pending write, so check if we're the last reader if (@atomic :acquire_release rw.readerwait -= 1) == 0 # we observed that there was a pending write AND we just # observed that we were the last reader, so it's our # responsibility to notify the writer that it can proceed notify(rw.writeready) end end return end function Base.lock(rw::ReadWriteLock) lock(rw.writelock) # only a single writer allowed at a time # first, we subtract MaxReaders from readercount # to make readercount negative; this will prevent any further readers # from locking, while maintaining our actual reader count so we # can track when we're able to write r = (@atomic :acquire_release rw.readercount -= MaxReaders) + MaxReaders # we also _add_ MaxReaders so that `r` is the number of active readers # if r == 0, that means there were no readers, # so we can proceed directly with the write lock (the 1st check below in the if) # if there _are_ active readers, then we need to wait until they all unlock # by incrementing `readerwait` by `r`, we're atomically setting the # of read # unlocks we expect to happen before we can proceed with the write lock # (readers decrement `readerwait` if they observe `readercount` is negative) # if, by chance, the last reader manages to unlock before we increment `readerwait`, # then `readerwait` will actually be negative and we'll increment it back to 0 # in that case, we can proceed directly with the write lock (the 2nd check below) if r != 0 && (@atomic :acquire_release rw.readerwait += r) != 0 # otherwise, there are still pending readers, so we need to wait for them to finish # we do this by waiting on the `writeready` event, which will be # notified when the last reader unlocks; if the last reader happens # to be racing with us, then `writeready` will already be set and # we'll proceed immediately wait(rw.writeready) end return end Base.islocked(rw::ReadWriteLock) = islocked(rw.writelock) function Base.unlock(rw::ReadWriteLock) r = (@atomic :acquire_release rw.readercount += MaxReaders) if r > 0 # wake up waiting readers Base.@lock rw.readwait notify(rw.readwait) end unlock(rw.writelock) return end end:/opt/julia/packages/ConcurrentUtilities/J6iMP/src/pools.jlmodule Pools export Pool, acquire, release, drain! import Base: acquire, release """ Pool{T}(limit::Int=4096) Pool{K, T}(limit::Int=4096) A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a function that returns a new object of type `T`. The `key` argument is optional and can be used to lookup objects that match a certain criteria (a `Dict` is used internally, so matching is `isequal`). The `limit` argument will limit the number of objects that can be in use at any given time. If the limit has been reached, `acquire` will block until an object is released via `release`. - `release(pool, obj)` will return the object to the pool for reuse. - `release(pool)` will decrement the number in use but not return any object for reuse. - `drain!` can be used to remove objects that have been returned to the pool for reuse; it does *not* release any objects that are in use. See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`. The key and object types can be inspected with `keytype` and `valtype` respectively. """ mutable struct Pool{K, T} lock::Threads.Condition limit::Int cur::Int keyedvalues::Dict{K, Vector{T}} values::Vector{T} function Pool{K, T}(limit::Int=4096) where {K, T} T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`")) x = new(Threads.Condition(), limit, 0) if K === Nothing x.values = T[] safesizehint!(x.values, limit) else x.keyedvalues = Dict{K, Vector{T}}() end return x end end Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit) safesizehint!(x, n) = sizehint!(x, min(4096, n)) # determines whether we'll look up object caches in .keyedvalues or .values iskeyed(::Pool{K}) where {K} = K !== Nothing """ keytype(::Pool) Return the type of the keys for the pool. If the pool is not keyed, this will return `Nothing`. """ Base.keytype(::Type{<:Pool{K}}) where {K} = K Base.keytype(p::Pool) = keytype(typeof(p)) """ valtype(::Pool) Return the type of the objects that can be stored in the pool. """ Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T Base.valtype(p::Pool) = valtype(typeof(p)) """ Pools.limit(pool::Pool) -> Int Return the maximum number of objects permitted to be in use at the same time. See `Pools.in_use(pool)` for the number of objects currently in use. """ limit(pool::Pool) = Base.@lock pool.lock pool.limit """ Pools.in_use(pool::Pool) -> Int Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`. """ in_use(pool::Pool) = Base.@lock pool.lock pool.cur """ Pools.in_pool(pool::Pool) -> Int Return the number of objects in the pool available for reuse. """ in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0) in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values) """ drain!(pool) Remove all objects from the pool for reuse, but do not release any active acquires. """ function drain!(pool::Pool{K}) where {K} Base.@lock pool.lock begin if iskeyed(pool) for objs in values(pool.keyedvalues) empty!(objs) end else empty!(pool.values) end end end # in VERSION >= v"1.7", we can replace `TRUE` with `Returns(true)` TRUE(x) = true @noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K")) @noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use")) # NOTE: assumes you have the lock! function releasepermit(pool::Pool) pool.cur > 0 || releaseerror() pool.cur -= 1 notify(pool.lock; all=false) return end """ acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T Get an object from a `pool`, optionally keyed by the provided `key`. The provided function `f` must create a new object instance of type `T`. Each `acquire` call MUST be matched by exactly one `release` call. The `forcenew` keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool. The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid for reuse. By default, all objects are considered valid. If there are no objects available for reuse, `f` will be called to create a new object. If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`. """ function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T} key isa K || keyerror(key, K) Base.@lock pool.lock begin # first get a permit while pool.cur >= pool.limit wait(pool.lock) end pool.cur += 1 # now see if we can get an object from the pool for reuse if !forcenew objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values while !isempty(objs) obj = pop!(objs) isvalid(obj) && return obj end end end try # if there weren't any objects to reuse or we were forcenew, we'll create a new one return f() catch # if we error creating a new object, it's critical we return the permit to the pool Base.@lock pool.lock releasepermit(pool) rethrow() end end """ release(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing) release(pool::Pool{K, T}, obj::T) release(pool::Pool{K, T}) Release an object from usage by a `pool`, optionally keyed by the provided `key`. If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned, or `release(pool)` is called, the usage count will be decremented without an object being returned to the pool for reuse. """ function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} key isa K || keyerror(key, K) Base.@lock pool.lock begin # return the permit releasepermit(pool) # if we're given an object, we'll put it back in the pool if obj !== nothing # if an invalid key is provided, we let the KeyError propagate objs = iskeyed(pool) ? pool.keyedvalues[key] : pool.values push!(objs, obj) end end return end Base.release(pool::Pool{K, T}, obj::T) where {K, T} = release(pool, nothing, obj) Base.release(pool::Pool{K, T}) where {K, T} = release(pool, nothing, nothing) end # module F%