jli  Linuxx86_641.10.3v1.10.30b4590a5507d3f3046e5bafc007cacbbfc9b310b1ѳ Distributedo[\(  pW X6I/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/Distributed.jl4Ak1 [)*Serializationސݗ1V$ bdSocketsN/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/clusterserialize.jl4AE/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/cluster.jl4A\Y_ɢ?RandomF/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/messages.jl4AN/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/process_messages.jl4AH/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl4AD/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/macros.jl4AH/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/workerpool.jl4AB/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/pmap.jl4AF/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/managers.jl4AH/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/precompile.jl4A CoremуJ5Basemу]J5MainmуJ5ArgToolsBń x(mуF K5 Artifactsmr-V3|mу K5Base64UlD*_mу> K5CRC32c\y.jmуj K5 FileWatchingXzsy`{,zmуh& K5LibdluVW59˗,mу-" K5LoggingT{VhUXM=mуrU" K5MmapP~:xg,Omу|' K5NetworkOptionsC0YW,mуʠ, K5SHAQ<$!<%mу1 K5 Serialization [)*k1mу-G K5Sockets1V$ bdސݗmуYBY K5UnicodeP>I>Nrmуeszo K5 LinearAlgebraSm7̏mуuux K5 OpenBLAS_jll[(Śb6EcQ FmуDux K5libblastrampoline_jllLSۆ }lxӠmу^} K5MarkdownZPn7z`smу/Ed~ K5Printfg^cX׸QDmу;h K5Random_ɢ?\Ymу? K5TarOi>աmу!t, K5DatesEY8pj2 mуX K5FuturebS;3{I xVMmуsD K5InteractiveUtilsWL ~@'ZmуVg K5LibGit2Z[&RPTv3EКRmу8J K5 LibGit2_jll YXg}]$mуD K5 MbedTLS_jllAX 3ȡ_mу- K5 LibSSH2_jlloTZk)߆ 1 # broadcast top-level (e.g. from Main) import/using from node 1 (only) @sync for p in procs() p == 1 && continue # Extensions are already loaded on workers by their triggers being loaded # so no need to fire the callback upon extension being loaded on master. Base.loading_extension && continue @async_unwrap remotecall_wait(p) do Base.require(mod) nothing end end end end const REF_ID = Threads.Atomic{Int}(1) next_ref_id() = Threads.atomic_add!(REF_ID, 1) struct RRID whence::Int id::Int RRID() = RRID(myid(), next_ref_id()) RRID(whence, id) = new(whence, id) end hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h)) ==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id) include("clusterserialize.jl") include("cluster.jl") # cluster setup and management, addprocs include("messages.jl") include("process_messages.jl") # process incoming messages include("remotecall.jl") # the remotecall* api include("macros.jl") # @spawn and friends include("workerpool.jl") include("pmap.jl") include("managers.jl") # LocalManager and SSHManager include("precompile.jl") function __init__() init_parallel() end end N/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/clusterserialize.jl!# This file is a part of Julia. License is MIT: https://julialang.org/license using Serialization: serialize_cycle, deserialize_cycle, writetag, serialize_typename, deserialize_typename, TYPENAME_TAG, TASK_TAG, reset_state, serialize_type using Serialization.__deserialized_types__ import Serialization: object_number, lookup_object_number, remember_object mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer io::I counter::Int table::IdDict{Any,Any} pending_refs::Vector{Int} pid::Int # Worker we are connected to. tn_obj_sent::Set{UInt64} # TypeName objects sent glbs_sent::Dict{Symbol, Tuple{UInt64, UInt64}} # (key,value) -> (symbol, (hash_value, objectid)) glbs_in_tnobj::Dict{UInt64, Vector{Symbol}} # Track globals referenced in # anonymous functions. anonfunc_id::UInt64 function ClusterSerializer{I}(io::I) where I<:IO new(io, 0, IdDict(), Int[], worker_id_from_socket(io), Set{UInt64}(), Dict{UInt64, UInt64}(), Dict{UInt64, Vector{Symbol}}(), 0) end end ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io) const object_numbers = WeakKeyDict() const obj_number_salt = Ref(0) function object_number(s::ClusterSerializer, @nospecialize(l)) global obj_number_salt, object_numbers if haskey(object_numbers, l) return object_numbers[l] end # a hash function that always gives the same number to the same # object on the same machine, and is unique over all machines. ln = obj_number_salt[]+(UInt64(myid())<<44) obj_number_salt[] += 1 object_numbers[l] = ln return ln::UInt64 end const known_object_data = Dict{UInt64,Any}() function lookup_object_number(s::ClusterSerializer, n::UInt64) return get(known_object_data, n, nothing) end function remember_object(s::ClusterSerializer, @nospecialize(o), n::UInt64) known_object_data[n] = o if isa(o, Core.TypeName) && !haskey(object_numbers, o) # set up reverse mapping for serialize object_numbers[o] = n end return nothing end function deserialize(s::ClusterSerializer, ::Type{Core.TypeName}) full_body_sent = deserialize(s) number = read(s.io, UInt64) if !full_body_sent tn = lookup_object_number(s, number)::Core.TypeName remember_object(s, tn, number) deserialize_cycle(s, tn) else tn = deserialize_typename(s, number) end # retrieve arrays of global syms sent if any and deserialize them all. foreach(sym->deserialize_global_from_main(s, sym), deserialize(s)) return tn end function serialize(s::ClusterSerializer, t::Core.TypeName) serialize_cycle(s, t) && return writetag(s.io, TYPENAME_TAG) identifier = object_number(s, t) send_whole = !(identifier in s.tn_obj_sent) serialize(s, send_whole) write(s.io, identifier) if send_whole # Track globals referenced in this anonymous function. # This information is used to resend modified globals when we # only send the identifier. prev = s.anonfunc_id s.anonfunc_id = identifier serialize_typename(s, t) s.anonfunc_id = prev push!(s.tn_obj_sent, identifier) finalizer(t) do x cleanup_tname_glbs(s, identifier) end end # Send global refs if required. syms = syms_2b_sent(s, identifier) serialize(s, syms) foreach(sym->serialize_global_from_main(s, sym), syms) nothing end function serialize(s::ClusterSerializer, g::GlobalRef) # Record if required and then invoke the default GlobalRef serializer. sym = g.name if g.mod === Main && isdefined(g.mod, sym) if (binding_module(Main, sym) === Main) && (s.anonfunc_id != 0) && !startswith(string(sym), "#") # Anonymous functions are handled via FULL_GLOBALREF_TAG push!(get!(s.glbs_in_tnobj, s.anonfunc_id, []), sym) end end invoke(serialize, Tuple{AbstractSerializer, GlobalRef}, s, g) end # Send/resend a global binding if # a) has not been sent previously, i.e., we are seeing this binding for the first time, or, # b) hash value has changed or # c) hash value is same but of a different object, i.e. objectid has changed or # d) is a bits type function syms_2b_sent(s::ClusterSerializer, identifier) lst = Symbol[] check_syms = get(s.glbs_in_tnobj, identifier, Symbol[]) for sym in check_syms v = getfield(Main, sym) if isbits(v) push!(lst, sym) else if haskey(s.glbs_sent, sym) # We have sent this binding before, see if it has changed. hval, oid = s.glbs_sent[sym] if hval != hash(sym, hash(v)) || oid != objectid(v) push!(lst, sym) end else push!(lst, sym) end end end return unique(lst) end function serialize_global_from_main(s::ClusterSerializer, sym) v = getfield(Main, sym) if !isbits(v) s.glbs_sent[sym] = (hash(sym, hash(v)), objectid(v)) end serialize(s, isconst(Main, sym)) serialize(s, v) end function deserialize_global_from_main(s::ClusterSerializer, sym) sym_isconst = deserialize(s) v = deserialize(s) if isdefined(Main, sym) && (sym_isconst || isconst(Main, sym)) if isequal(getfield(Main, sym), v) # same value; ok return nothing else @warn "Cannot transfer global variable $sym; it already has a value." return nothing end end if sym_isconst ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v) else setglobal!(Main, sym, v) end return nothing end function cleanup_tname_glbs(s::ClusterSerializer, identifier) delete!(s.glbs_in_tnobj, identifier) end # TODO: cleanup from s.tn_obj_sent # Specialized serialize-deserialize implementations for CapturedException to partially # recover from any deserialization errors in `CapturedException.ex` function serialize(s::ClusterSerializer, ex::CapturedException) serialize_type(s, typeof(ex)) serialize(s, string(typeof(ex.ex))) # String type should not result in a deser error serialize(s, ex.processed_bt) # Currently should not result in a deser error serialize(s, ex.ex) # can result in a UndefVarError on the remote node # if a type used in ex.ex is undefined on the remote node. end function original_ex(s::ClusterSerializer, ex_str, remote_stktrace) local pid_str = "" try pid_str = string(" from worker ", worker_id_from_socket(s.io)) catch end stk_str = remote_stktrace ? "Remote" : "Local" ErrorException(string("Error deserializing a remote exception", pid_str, "\n", "Remote(original) exception of type ", ex_str, "\n", stk_str, " stacktrace : ")) end function deserialize(s::ClusterSerializer, t::Type{<:CapturedException}) ex_str = deserialize(s) local bt local capex try bt = deserialize(s) catch e throw(CompositeException([ original_ex(s, ex_str, false), CapturedException(e, catch_backtrace()) ])) end try capex = deserialize(s) catch e throw(CompositeException([ CapturedException(original_ex(s, ex_str, true), bt), CapturedException(e, catch_backtrace()) ])) end return CapturedException(capex, bt) end """ clear!(syms, pids=workers(); mod=Main) Clears global bindings in modules by initializing them to `nothing`. `syms` should be of type [`Symbol`](@ref) or a collection of `Symbol`s . `pids` and `mod` identify the processes and the module in which global variables are to be reinitialized. Only those names found to be defined under `mod` are cleared. An exception is raised if a global constant is requested to be cleared. """ function clear!(syms, pids=workers(); mod=Main) @sync for p in pids @async_unwrap remotecall_wait(clear_impl!, p, syms, mod) end end clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod) clear!(sym::Symbol, pids=workers(); mod=Main) = clear!([sym], pids; mod=mod) clear!(syms, pid::Int; mod=Main) = clear!(syms, [pid]; mod=mod) clear_impl!(syms, mod::Module) = foreach(x->clear_impl!(x,mod), syms) clear_impl!(sym::Symbol, mod::Module) = isdefined(mod, sym) && @eval(mod, global $sym = nothing) E/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/cluster.jl# This file is a part of Julia. License is MIT: https://julialang.org/license """ ClusterManager Supertype for cluster managers, which control workers processes as a cluster. Cluster managers implement how workers can be added, removed and communicated with. `SSHManager` and `LocalManager` are subtypes of this. """ abstract type ClusterManager end """ WorkerConfig Type used by [`ClusterManager`](@ref)s to control workers added to their clusters. Some fields are used by all cluster managers to access a host: * `io` -- the connection used to access the worker (a subtype of `IO` or `Nothing`) * `host` -- the host address (either a `String` or `Nothing`) * `port` -- the port on the host used to connect to the worker (either an `Int` or `Nothing`) Some are used by the cluster manager to add workers to an already-initialized host: * `count` -- the number of workers to be launched on the host * `exename` -- the path to the Julia executable on the host, defaults to `"\$(Sys.BINDIR)/julia"` or `"\$(Sys.BINDIR)/julia-debug"` * `exeflags` -- flags to use when launching Julia remotely The `userdata` field is used to store information for each worker by external managers. Some fields are used by `SSHManager` and similar managers: * `tunnel` -- `true` (use tunneling), `false` (do not use tunneling), or [`nothing`](@ref) (use default for the manager) * `multiplex` -- `true` (use SSH multiplexing for tunneling) or `false` * `forward` -- the forwarding option used for `-L` option of ssh * `bind_addr` -- the address on the remote host to bind to * `sshflags` -- flags to use in establishing the SSH connection * `max_parallel` -- the maximum number of workers to connect to in parallel on the host Some fields are used by both `LocalManager`s and `SSHManager`s: * `connect_at` -- determines whether this is a worker-to-worker or driver-to-worker setup call * `process` -- the process which will be connected (usually the manager will assign this during [`addprocs`](@ref)) * `ospid` -- the process ID according to the host OS, used to interrupt worker processes * `environ` -- private dictionary used to store temporary information by Local/SSH managers * `ident` -- worker as identified by the [`ClusterManager`](@ref) * `connect_idents` -- list of worker ids the worker must connect to if using a custom topology * `enable_threaded_blas` -- `true`, `false`, or `nothing`, whether to use threaded BLAS or not on the workers """ mutable struct WorkerConfig # Common fields relevant to all cluster managers io::Union{IO, Nothing} host::Union{String, Nothing} port::Union{Int, Nothing} # Used when launching additional workers at a host count::Union{Int, Symbol, Nothing} exename::Union{String, Cmd, Nothing} exeflags::Union{Cmd, Nothing} # External cluster managers can use this to store information at a per-worker level # Can be a dict if multiple fields need to be stored. userdata::Any # SSHManager / SSH tunnel connections to workers tunnel::Union{Bool, Nothing} multiplex::Union{Bool, Nothing} forward::Union{String, Nothing} bind_addr::Union{String, Nothing} sshflags::Union{Cmd, Nothing} max_parallel::Union{Int, Nothing} # Used by Local/SSH managers connect_at::Any process::Union{Process, Nothing} ospid::Union{Int, Nothing} # Private dictionary used to store temporary information by Local/SSH managers. environ::Union{Dict, Nothing} # Connections to be setup depending on the network topology requested ident::Any # Worker as identified by the Cluster Manager. # List of other worker idents this worker must connect with. Used with topology T_CUSTOM. connect_idents::Union{Array, Nothing} # Run multithreaded blas on worker enable_threaded_blas::Union{Bool, Nothing} function WorkerConfig() wc = new() for n in 1:fieldcount(WorkerConfig) setfield!(wc, n, nothing) end wc end end @enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED W_UNKNOWN_STATE mutable struct Worker id::Int msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels? add_msgs::Array{Any,1} @atomic gcflag::Bool state::WorkerState c_state::Condition # wait for state changes ct_time::Float64 # creation time conn_func::Any # used to setup connections lazily r_stream::IO w_stream::IO w_serializer::ClusterSerializer # writes can happen from any task hence store the # serializer as part of the Worker object manager::ClusterManager config::WorkerConfig version::Union{VersionNumber, Nothing} # Julia version of the remote process initialized::Event function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager; version::Union{VersionNumber, Nothing}=nothing, config::WorkerConfig=WorkerConfig()) w = Worker(id) w.r_stream = r_stream w.w_stream = buffer_writes(w_stream) w.w_serializer = ClusterSerializer(w.w_stream) w.manager = manager w.config = config w.version = version set_worker_state(w, W_CONNECTED) register_worker_streams(w) w end Worker(id::Int) = Worker(id, nothing) function Worker(id::Int, conn_func) @assert id > 0 if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func) w.initialized = Event() register_worker(w) w end Worker() = Worker(get_next_pid()) end function set_worker_state(w, state) w.state = state notify(w.c_state; all=true) end function check_worker_state(w::Worker) if w.state === W_CREATED if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker # may not have connected to us yet. Wait for some time. wait_for_conn(w) else error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) end else w.ct_time = time() if myid() > w.id t = @async exec_conn_func(w) else # route request via node 1 t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) end end end exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker) function exec_conn_func(w::Worker) try f = notnothing(w.conn_func) # Will be called if some other task tries to connect at the same time. w.conn_func = () -> wait_for_conn(w) f() catch e w.conn_func = () -> throw(e) rethrow() end nothing end function wait_for_conn(w) if w.state === W_CREATED timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") @async (sleep(timeout); notify(w.c_state; all=true)) wait(w.c_state) w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end nothing end ## process group creation ## mutable struct LocalProcess id::Int bind_addr::String bind_port::UInt16 cookie::String LocalProcess() = new(1) end worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0")) ## worker creation and setup ## """ start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) `start_worker` is an internal function which is the default entry point for worker processes connecting via TCP/IP. It sets up the process as a Julia cluster worker. host:port information is written to stream `out` (defaults to stdout). The function reads the cookie from stdin if required, and listens on a free port (or if specified, the port in the `--bind-to` command line option) and schedules tasks to process incoming TCP connections and requests. It also (optionally) closes stdin and redirects stderr to stdout. It does not return. """ start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...) function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) init_multi() if close_stdin # workers will not use it redirect_stdin(devnull) close(stdin) end stderr_to_stdout && redirect_stderr(stdout) init_worker(cookie) interface = IPv4(LPROC.bind_addr) if LPROC.bind_port == 0 port_hint = 9000 + (getpid() % 1000) (port, sock) = listenany(interface, UInt16(port_hint)) LPROC.bind_port = port else sock = listen(interface, LPROC.bind_port) end errormonitor(@async while isopen(sock) client = accept(sock) process_messages(client, client, true) end) print(out, "julia_worker:") # print header print(out, "$(string(LPROC.bind_port))#") # print port print(out, LPROC.bind_addr) print(out, '\n') flush(out) Sockets.nagle(sock, false) Sockets.quickack(sock, true) if ccall(:jl_running_on_valgrind,Cint,()) != 0 println(out, "PID = $(getpid())") end try # To prevent hanging processes on remote machines, newly launched workers exit if the # master process does not connect in time. check_master_connect() while true; wait(); end catch err print(stderr, "unhandled exception on $(myid()): $(err)\nexiting.\n") end close(sock) exit(0) end function redirect_worker_output(ident, stream) t = @async while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available # on the master directly - they are routed via the initial worker's stdout. println(line) else println(" From worker $(ident):\t$line") end end errormonitor(t) end struct LaunchWorkerError <: Exception msg::String end Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg) # The default TCP transport relies on the worker listening on a free # port available and printing its bind address and port. # The master process uses this to connect to the worker and subsequently # setup a all-to-all network. function read_worker_host_port(io::IO) t0 = time_ns() # Wait at most for JULIA_WORKER_TIMEOUT seconds to read host:port # info from the worker timeout = worker_timeout() * 1e9 # We expect the first line to contain the host:port string. However, as # the worker may be launched via ssh or a cluster manager like SLURM, # ignore any informational / warning lines printed by the launch command. # If we do not find the host:port string in the first 1000 lines, treat it # as an error. ntries = 1000 leader = String[] try while ntries > 0 readtask = @async readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) end !istaskdone(readtask) && break conninfo = fetch(readtask) if isempty(conninfo) && !isopen(io) throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?")) end ntries -= 1 bind_addr, port = parse_connection_info(conninfo) if !isempty(bind_addr) return bind_addr, port end # collect unmatched lines push!(leader, conninfo) end close(io) if ntries > 0 throw(LaunchWorkerError("Timed out waiting to read host:port string from worker.")) else throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found.")) end finally for line in leader println("\tFrom worker startup:\t", line) end end end function parse_connection_info(str) m = match(r"^julia_worker:(\d+)#(.*)", str) if m !== nothing (String(m.captures[2]), parse(UInt16, m.captures[1])) else ("", UInt16(0)) end end """ init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager()) Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument `--worker[=]` has the effect of initializing a process as a worker using TCP/IP sockets for transport. `cookie` is a [`cluster_cookie`](@ref). """ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager()) myrole!(:worker) # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. global cluster_manager cluster_manager = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 @assert isempty(PGRP.refs) @assert isempty(client_refs) # System is started in head node mode, cleanup related entries empty!(PGRP.workers) empty!(map_pid_wrkr) cluster_cookie(cookie) nothing end # The main function for adding worker processes. # `manager` is of type ClusterManager. The respective managers are responsible # for launching the workers. All keyword arguments (plus a few default values) # are available as a dictionary to the `launch` methods # # Only one addprocs can be in progress at any time # const worker_lock = ReentrantLock() """ addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers Launches worker processes via the specified cluster manager. For example, Beowulf clusters are supported via a custom cluster manager implemented in the package `ClusterManagers.jl`. The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's environment. Relevant only when using TCP/IP as transport. To launch workers without blocking the REPL, or the containing function if launching workers programmatically, execute `addprocs` in its own task. # Examples ```julia # On busy clusters, call `addprocs` asynchronously t = @async addprocs(...) ``` ```julia # Utilize workers as and when they come online if nprocs() > 1 # Ensure at least one new worker is available .... # perform distributed execution end ``` ```julia # Retrieve newly launched worker IDs, or any error messages if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block if nworkers() == N new_pids = fetch(t) else fetch(t) end end ``` """ function addprocs(manager::ClusterManager; kwargs...) init_multi() cluster_mgmt_from_master_check() lock(worker_lock) try addprocs_locked(manager::ClusterManager; kwargs...) finally unlock(worker_lock) end end function addprocs_locked(manager::ClusterManager; kwargs...) params = merge(default_addprocs_params(manager), Dict{Symbol,Any}(kwargs)) topology(Symbol(params[:topology])) if PGRP.topology !== :all_to_all params[:lazy] = false end if PGRP.lazy === nothing || nprocs() == 1 PGRP.lazy = params[:lazy] elseif isclusterlazy() != params[:lazy] throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(), ". Cannot set lazy=", params[:lazy]))) end # References to launched workers, filled when each worker is fully initialized and # has connected to all nodes. launched_q = Int[] # Asynchronously filled by the launch method # The `launch` method should add an object of type WorkerConfig for every # worker launched. It provides information required on how to connect # to it. # FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition # but both are part of the public interface. This means we currently can't use # `Threads.@spawn` in the code below. launched = WorkerConfig[] launch_ntfy = Condition() # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online t_launch = @async launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break @async (sleep(1); notify(launch_ntfy)) wait(launch_ntfy) end if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig @async setup_launched_worker(manager, wconfig, launched_q) end end end end Base.wait(t_launch) # catches any thrown errors from the launch task # Since all worker-to-worker setups may not have completed by the time this # function returns to the caller, send the complete list to all workers. # Useful for nprocs(), nworkers(), etc to return valid values on the workers. all_w = workers() for pid in all_w remote_do(set_valid_processes, pid, all_w) end sort!(launched_q) end function set_valid_processes(plist::Array{Int}) for pid in setdiff(plist, workers()) myid() != pid && Worker(pid) end end """ default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any} Implemented by cluster managers. The default keyword parameters passed when calling `addprocs(mgr)`. The minimal set of options is available by calling `default_addprocs_params()` """ default_addprocs_params(::ClusterManager) = default_addprocs_params() default_addprocs_params() = Dict{Symbol,Any}( :topology => :all_to_all, :dir => pwd(), :exename => joinpath(Sys.BINDIR, julia_exename()), :exeflags => ``, :env => [], :enable_threaded_blas => false, :lazy => true) function setup_launched_worker(manager, wconfig, launched_q) pid = create_worker(manager, wconfig) push!(launched_q, pid) # When starting workers on remote multi-core hosts, `launch` can (optionally) start only one # process on the remote machine, with a request to start additional workers of the # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. cnt = something(wconfig.count, 1) if cnt === :auto cnt = wconfig.environ[:cpu_threads] end cnt = cnt - 1 # Removing self from the requested number if cnt > 0 launch_n_additional_processes(manager, pid, wconfig, cnt, launched_q) end end function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launched_q) @sync begin exename = notnothing(fromconfig.exename) exeflags = something(fromconfig.exeflags, ``) cmd = `$exename $exeflags` new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd) for address in new_addresses (bind_addr, port) = address wconfig = WorkerConfig() for x in [:host, :tunnel, :multiplex, :sshflags, :exeflags, :exename, :enable_threaded_blas] Base.setproperty!(wconfig, x, Base.getproperty(fromconfig, x)) end wconfig.bind_addr = bind_addr wconfig.port = port let wconfig=wconfig @async begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) end end end end end function create_worker(manager, wconfig) # only node 1 can add new nodes, since nobody else has the full list of address:port @assert LPROC.id == 1 timeout = worker_timeout() # initiate a connect. Does not wait for connection completion in case of TCP. w = Worker() local r_s, w_s try (r_s, w_s) = connect(manager, w.id, wconfig) catch ex try deregister_worker(w.id) kill(manager, w.id, wconfig) finally rethrow(ex) end end w = Worker(w.id, r_s, w_s, manager; config=wconfig) # install a finalizer to perform cleanup if necessary finalizer(w) do w if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end end # set when the new worker has finished connections with all other workers ntfy_oid = RRID() rr_ntfy_join = lookup_ref(ntfy_oid) rr_ntfy_join.waitingfor = myid() # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. process_messages(w.r_stream, w.w_stream, false) # send address information of all workers to the new worker. # Cluster managers set the address of each worker in `WorkerConfig.connect_at`. # A new worker uses this to setup an all-to-all network if topology :all_to_all is specified. # Workers with higher pids connect to workers with lower pids. Except process 1 (master) which # initiates connections to all workers. # Connection Setup Protocol: # - Master sends 16-byte cookie followed by 16-byte version string and a JoinPGRP message to all workers # - On each worker # - Worker responds with a 16-byte version followed by a JoinCompleteMsg # - Connects to all workers less than its pid. Sends the cookie, version and an IdentifySocket message # - Workers with incoming connection requests write back their Version and an IdentifySocketAckMsg message # - On master, receiving a JoinCompleteMsg triggers rr_ntfy_join (signifies that worker setup is complete) join_list = [] if PGRP.topology === :all_to_all # need to wait for lower worker pids to have completed connecting, since the numerical value # of pids is relevant to the connection process, i.e., higher pids connect to lower pids and they # require the value of config.connect_at which is set only upon connection completion for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) (jw.state === W_CREATED) && wait(jw.c_state) push!(join_list, jw) end end elseif PGRP.topology === :custom # wait for requested workers to be up before connecting to them. filterfunc(x) = (x.id != 1) && isdefined(x, :config) && (notnothing(x.config.ident) in something(wconfig.connect_idents, [])) wlist = filter(filterfunc, PGRP.workers) waittime = 0 while wconfig.connect_idents !== nothing && length(wlist) < length(wconfig.connect_idents) if waittime >= timeout error("peer workers did not connect within $timeout seconds") end sleep(1.0) waittime += 1 wlist = filter(filterfunc, PGRP.workers) end for wl in wlist (wl.state === W_CREATED) && wait(wl.c_state) push!(join_list, wl) end end all_locs = mapany(x -> isa(x, Worker) ? (something(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list) send_connection_hdr(w, true) enable_threaded_blas = something(wconfig.enable_threaded_blas, false) join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) @async manage(w.manager, w.id, w.config, :register) # wait for rr_ntfy_join with timeout timedout = false @async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1)) wait(rr_ntfy_join) if timedout error("worker did not connect within $timeout seconds") end lock(client_refs) do delete!(PGRP.refs, ntfy_oid) end return w.id end # Called on the first worker on a remote host. Used to optimize launching # of multiple workers on a remote host (to leverage multi-core) additional_io_objs=Dict() function launch_additional(np::Integer, cmd::Cmd) io_objs = Vector{Any}(undef, np) addresses = Vector{Any}(undef, np) for i in 1:np io = open(detach(cmd), "r+") write_cookie(io) io_objs[i] = io.out end for (i,io) in enumerate(io_objs) (host, port) = read_worker_host_port(io) addresses[i] = (host, port) additional_io_objs[port] = io end return addresses end function redirect_output_from_additional_worker(pid, port) io = additional_io_objs[port] redirect_worker_output("$pid", io) delete!(additional_io_objs, port) nothing end function check_master_connect() timeout = worker_timeout() * 1e9 # If we do not have at least process 1 connect to us within timeout # we log an error and exit, unless we're running on valgrind if ccall(:jl_running_on_valgrind,Cint,()) != 0 return end @async begin start = time_ns() while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout sleep(1.0) end if !haskey(map_pid_wrkr, 1) print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") exit(1) end end end """ cluster_cookie() -> cookie Return the cluster cookie. """ cluster_cookie() = (init_multi(); LPROC.cookie) """ cluster_cookie(cookie) -> cookie Set the passed cookie as the cluster cookie, then returns it. """ function cluster_cookie(cookie) init_multi() # The cookie must be an ASCII string with length <= HDR_COOKIE_LEN @assert isascii(cookie) @assert length(cookie) <= HDR_COOKIE_LEN cookie = rpad(cookie, HDR_COOKIE_LEN) LPROC.cookie = cookie cookie end let next_pid = 2 # 1 is reserved for the client (always) global get_next_pid function get_next_pid() retval = next_pid next_pid += 1 retval end end mutable struct ProcessGroup name::String workers::Array{Any,1} refs::Dict{RRID,Any} # global references topology::Symbol lazy::Union{Bool, Nothing} ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, nothing) end const PGRP = ProcessGroup([]) function topology(t) @assert t in [:all_to_all, :master_worker, :custom] if (PGRP.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1) PGRP.topology = t else error("Workers with Topology $(PGRP.topology) already exist. Requested Topology $(t) cannot be set.") end t end isclusterlazy() = something(PGRP.lazy, false) get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) get_bind_addr(w::LocalProcess) = LPROC.bind_addr function get_bind_addr(w::Worker) if w.config.bind_addr === nothing if w.id != myid() w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id) end end w.config.bind_addr end # globals const LPROC = LocalProcess() const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN=16 const HDR_COOKIE_LEN=16 const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() # whether process is a master or worker in a distributed setup myrole() = LPROCROLE[] function myrole!(proctype::Symbol) LPROCROLE[] = proctype end # cluster management related API """ myid() Get the id of the current process. # Examples ```julia-repl julia> myid() 1 julia> remotecall_fetch(() -> myid(), 4) 4 ``` """ myid() = LPROC.id """ nprocs() Get the number of available processes. # Examples ```julia-repl julia> nprocs() 3 julia> workers() 2-element Array{Int64,1}: 2 3 ``` """ function nprocs() if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) n = length(PGRP.workers) # filter out workers in the process of being setup/shutdown. for jw in PGRP.workers if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED) n = n - 1 end end return n else return length(PGRP.workers) end end """ nworkers() Get the number of available worker processes. This is one less than [`nprocs()`](@ref). Equal to `nprocs()` if `nprocs() == 1`. # Examples ```julia-repl \$ julia -p 2 julia> nprocs() 3 julia> nworkers() 2 ``` """ function nworkers() n = nprocs() n == 1 ? 1 : n-1 end """ procs() Return a list of all process identifiers, including pid 1 (which is not included by [`workers()`](@ref)). # Examples ```julia-repl \$ julia -p 2 julia> procs() 3-element Array{Int64,1}: 1 2 3 ``` """ function procs() if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)] else return Int[x.id for x in PGRP.workers] end end function id_in_procs(id) # faster version of `id in procs()` if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) for x in PGRP.workers if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED) return true end end else for x in PGRP.workers if (x.id::Int) == id return true end end end return false end """ procs(pid::Integer) Return a list of all process identifiers on the same physical node. Specifically all workers bound to the same ip-address as `pid` are returned. """ function procs(pid::Integer) if myid() == 1 all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)] if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else ipatpid = get_bind_addr(pid) Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, all_workers)] end else remotecall_fetch(procs, 1, pid) end end """ workers() Return a list of all worker process identifiers. # Examples ```julia-repl \$ julia -p 2 julia> workers() 2-element Array{Int64,1}: 2 3 ``` """ function workers() allp = procs() if length(allp) == 1 allp else filter(x -> x != 1, allp) end end function cluster_mgmt_from_master_check() if myid() != 1 throw(ErrorException("Only process 1 can add and remove workers")) end end """ rmprocs(pids...; waitfor=typemax(Int)) Remove the specified workers. Note that only process 1 can add or remove workers. Argument `waitfor` specifies how long to wait for the workers to shut down: - If unspecified, `rmprocs` will wait until all requested `pids` are removed. - An [`ErrorException`](@ref) is raised if all workers cannot be terminated before the requested `waitfor` seconds. - With a `waitfor` value of 0, the call returns immediately with the workers scheduled for removal in a different task. The scheduled [`Task`](@ref) object is returned. The user should call [`wait`](@ref) on the task before invoking any other parallel calls. # Examples ```julia-repl \$ julia -p 5 julia> t = rmprocs(2, 3, waitfor=0) Task (runnable) @0x0000000107c718d0 julia> wait(t) julia> workers() 3-element Array{Int64,1}: 4 5 6 ``` """ function rmprocs(pids...; waitfor=typemax(Int)) cluster_mgmt_from_master_check() pids = vcat(pids...) if waitfor == 0 t = @async _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. return @async nothing end end function _rmprocs(pids, waitfor) lock(worker_lock) try rmprocset = Union{LocalProcess, Worker}[] for p in pids if p == 1 @warn "rmprocs: process 1 not removed" else if haskey(map_pid_wrkr, p) w = map_pid_wrkr[p] set_worker_state(w, W_TERMINATING) kill(w.manager, p, w.config) push!(rmprocset, w) end end end start = time_ns() while (time_ns() - start) < waitfor*1e9 all(w -> w.state === W_TERMINATED, rmprocset) && break sleep(min(0.1, waitfor - (time_ns() - start)/1e9)) end unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)] if length(unremoved) > 0 estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.") throw(ErrorException(estr)) end finally unlock(worker_lock) end end """ ProcessExitedException(worker_id::Int) After a client Julia process has exited, further attempts to reference the dead child will throw this exception. """ struct ProcessExitedException <: Exception worker_id::Int end # No-arg constructor added for compatibility with Julia 1.0 & 1.1, should be deprecated in the future ProcessExitedException() = ProcessExitedException(-1) worker_from_id(i) = worker_from_id(PGRP, i) function worker_from_id(pg::ProcessGroup, i) if !isempty(map_del_wrkr) && in(i, map_del_wrkr) throw(ProcessExitedException(i)) end w = get(map_pid_wrkr, i, nothing) if w === nothing if myid() == 1 error("no process with id $i exists") end w = Worker(i) map_pid_wrkr[i] = w else w = w::Union{Worker, LocalProcess} end w end """ worker_id_from_socket(s) -> pid A low-level API which, given a `IO` connection or a `Worker`, returns the `pid` of the worker it is connected to. This is useful when writing custom [`serialize`](@ref) methods for a type, which optimizes the data written out depending on the receiving process id. """ function worker_id_from_socket(s) w = get(map_sock_wrkr, s, nothing) if isa(w,Worker) if s === w.r_stream || s === w.w_stream return w.id end end if isa(s,IOStream) && fd(s)==-1 # serializing to a local buffer return myid() end return -1 end register_worker(w) = register_worker(PGRP, w) function register_worker(pg, w) push!(pg.workers, w) map_pid_wrkr[w.id] = w end function register_worker_streams(w) map_sock_wrkr[w.r_stream] = w map_sock_wrkr[w.w_stream] = w end deregister_worker(pid) = deregister_worker(PGRP, pid) function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) w = pop!(map_pid_wrkr, pid, nothing) if isa(w, Worker) if isdefined(w, :r_stream) pop!(map_sock_wrkr, w.r_stream, nothing) if w.r_stream != w.w_stream pop!(map_sock_wrkr, w.w_stream, nothing) end end if myid() == 1 && (myrole() === :master) && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) if PGRP.topology !== :all_to_all || isclusterlazy() for rpid in workers() try remote_do(deregister_worker, rpid, pid) catch end end end end end push!(map_del_wrkr, pid) # delete this worker from our remote reference client sets ids = [] tonotify = [] lock(client_refs) do for (id, rv) in pg.refs if in(pid, rv.clientset) push!(ids, id) end if rv.waitingfor == pid push!(tonotify, (id, rv)) end end for id in ids del_client(pg, id, pid) end # throw exception to tasks waiting for this pid for (id, rv) in tonotify close(rv.c, ProcessExitedException(pid)) delete!(pg.refs, id) end end return end function interrupt(pid::Integer) @assert myid() == 1 w = map_pid_wrkr[pid] if isa(w, Worker) manage(w.manager, w.id, w.config, :interrupt) end return end """ interrupt(pids::Integer...) Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted. """ interrupt(pids::Integer...) = interrupt([pids...]) """ interrupt(pids::AbstractVector=workers()) Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted. """ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids @async interrupt(pid) end end end wp_bind_addr(p::LocalProcess) = p.bind_addr wp_bind_addr(p) = p.config.bind_addr function check_same_host(pids) if myid() != 1 return remotecall_fetch(check_same_host, 1, pids) else # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake if all(p -> (p==1) || (isa(map_pid_wrkr[p].manager, LocalManager)), pids) return true else first_bind_addr = notnothing(wp_bind_addr(map_pid_wrkr[pids[1]])) return all(p -> notnothing(wp_bind_addr(map_pid_wrkr[p])) == first_bind_addr, pids[2:end]) end end end function terminate_all_workers() myid() != 1 && return if nprocs() > 1 try rmprocs(workers(); waitfor=5.0) catch _ex @warn "Forcibly interrupting busy workers" exception=_ex # Might be computation bound, interrupt them and try again interrupt(workers()) try rmprocs(workers(); waitfor=5.0) catch _ex2 @error "Unable to terminate all workers" exception=_ex2,catch_backtrace() end end end end # initialize the local proc network address / port function init_bind_addr() opts = JLOptions() if opts.bindto != C_NULL bind_to = split(unsafe_string(opts.bindto), ":") bind_addr = string(parse(IPAddr, bind_to[1])) if length(bind_to) > 1 bind_port = parse(Int,bind_to[2]) else bind_port = 0 end else bind_port = 0 try bind_addr = string(getipaddr()) catch # All networking is unavailable, initialize bind_addr to the loopback address # Will cause an exception to be raised only when used. bind_addr = "127.0.0.1" end end global LPROC LPROC.bind_addr = bind_addr LPROC.bind_port = UInt16(bind_port) end using Random: randstring let inited = false # do initialization that's only needed when there is more than 1 processor global function init_multi() if !inited inited = true push!(Base.package_callbacks, _require_callback) atexit(terminate_all_workers) init_bind_addr() cluster_cookie(randstring(HDR_COOKIE_LEN)) end return nothing end end function init_parallel() start_gc_msgs_task() # start in "head node" mode, if worker, will override later. global PGRP global LPROC LPROC.id = 1 @assert isempty(PGRP.workers) register_worker(LPROC) end write_cookie(io::IO) = print(io.in, string(cluster_cookie(), "\n")) function get_threads_spec(opts) if opts.nthreads > 0 @assert opts.nthreadpools >= 1 @assert opts.nthreads_per_pool != C_NULL thr = "$(unsafe_load(opts.nthreads_per_pool))" if opts.nthreadpools == 2 thr = "$(thr),$(unsafe_load(opts.nthreads_per_pool, 2))" end `--threads=$(thr)` else `` end end function get_gcthreads_spec(opts) if opts.nmarkthreads > 0 || opts.nsweepthreads > 0 `--gcthreads=$(opts.nmarkthreads),$(opts.nsweepthreads)` else `` end end # Starts workers specified by (-n|--procs) and --machine-file command line options function process_opts(opts) # startup worker. # opts.startupfile, opts.load, etc should should not be processed for workers. if opts.worker == 1 # does not return if opts.cookie != C_NULL start_worker(unsafe_string(opts.cookie)) else start_worker() end end # Propagate --threads to workers threads = get_threads_spec(opts) # Propagate --gcthreads to workers gcthreads = get_gcthreads_spec(opts) exeflags = `$threads $gcthreads` # add processors if opts.nprocs > 0 addprocs(opts.nprocs; exeflags=exeflags) end # load processes from machine file if opts.machine_file != C_NULL addprocs(load_machine_file(unsafe_string(opts.machine_file)); exeflags=exeflags) end return nothing end function load_machine_file(path::AbstractString) machines = [] for line in split(read(path, String),'\n'; keepempty=false) s = split(line, '*'; keepempty=false) map!(strip, s, s) if length(s) > 1 cnt = all(isdigit, s[1]) ? parse(Int,s[1]) : Symbol(s[1]) push!(machines,(s[2], cnt)) else push!(machines,line) end end return machines end F/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/messages.jl# This file is a part of Julia. License is MIT: https://julialang.org/license abstract type AbstractMsg end ## Wire format description # # Each message has three parts, which are written in order to the worker's stream. # 1) A header of type MsgHeader is serialized to the stream (via `serialize`). # 2) A message of type AbstractMsg is then serialized. # 3) Finally, a fixed boundary of 10 bytes is written. # Message header stored separately from body to be able to send back errors if # a deserialization error occurs when reading the message body. struct MsgHeader response_oid::RRID notify_oid::RRID MsgHeader(respond_oid=RRID(0,0), notify_oid=RRID(0,0)) = new(respond_oid, notify_oid) end # Special oid (0,0) uses to indicate a null ID. # Used instead of Union{Int, Nothing} to decrease wire size of header. null_id(id) = id == RRID(0, 0) struct CallMsg{Mode} <: AbstractMsg f::Any args::Tuple kwargs end struct CallWaitMsg <: AbstractMsg f::Any args::Tuple kwargs end struct RemoteDoMsg <: AbstractMsg f::Any args::Tuple kwargs end struct ResultMsg <: AbstractMsg value::Any end # Worker initialization messages struct IdentifySocketMsg <: AbstractMsg from_pid::Int end struct IdentifySocketAckMsg <: AbstractMsg end struct JoinPGRPMsg <: AbstractMsg self_pid::Int other_workers::Array topology::Symbol enable_threaded_blas::Bool lazy::Bool end struct JoinCompleteMsg <: AbstractMsg cpu_threads::Int ospid::Int end # Avoiding serializing AbstractMsg containers results in a speedup # of approximately 10%. Can be removed once module Serialization # has been suitably improved. const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg, JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg, CallMsg{:call}, CallMsg{:call_fetch}] for (idx, tname) in enumerate(msgtypes) exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ] @eval function serialize_msg(s::AbstractSerializer, o::$tname) write(s.io, UInt8($idx)) $(exprs...) return nothing end end let msg_cases = :(@assert false "Message type index ($idx) expected to be between 1:$($(length(msgtypes)))") for i = length(msgtypes):-1:1 mti = msgtypes[i] msg_cases = :(if idx == $i $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), fieldcount(mti))...)) else $msg_cases end) end @eval function deserialize_msg(s::AbstractSerializer) idx = read(s.io, UInt8) return $msg_cases end end function send_msg_unknown(s::IO, header, msg) error("attempt to send to unknown socket") end function send_msg(s::IO, header, msg) id = worker_id_from_socket(s) if id > -1 return send_msg(worker_from_id(id), header, msg) end send_msg_unknown(s, header, msg) end function send_msg_now(s::IO, header, msg::AbstractMsg) id = worker_id_from_socket(s) if id > -1 return send_msg_now(worker_from_id(id), header, msg) end send_msg_unknown(s, header, msg) end function send_msg_now(w::Worker, header, msg) send_msg_(w, header, msg, true) end function send_msg(w::Worker, header, msg) send_msg_(w, header, msg, false) end function flush_gc_msgs(w::Worker) if !isdefined(w, :w_stream) return end add_msgs = nothing del_msgs = nothing @lock w.msg_lock begin if !w.gcflag # No work needed for this worker return end @atomic w.gcflag = false if !isempty(w.add_msgs) add_msgs = w.add_msgs w.add_msgs = Any[] end if !isempty(w.del_msgs) del_msgs = w.del_msgs w.del_msgs = Any[] end end if add_msgs !== nothing remote_do(add_clients, w, add_msgs) end if del_msgs !== nothing remote_do(del_clients, w, del_msgs) end return end # Boundary inserted between messages on the wire, used for recovering # from deserialization errors. Picked arbitrarily. # A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision # with message contents is negligible. const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d] # Faster serialization/deserialization of MsgHeader and RRID function serialize_hdr_raw(io, hdr) write(io, hdr.response_oid.whence, hdr.response_oid.id, hdr.notify_oid.whence, hdr.notify_oid.id) end function deserialize_hdr_raw(io) data = read!(io, Ref{NTuple{4,Int}}())[] return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4])) end function send_msg_(w::Worker, header, msg, now::Bool) check_worker_state(w) if myid() != 1 && !isa(msg, IdentifySocketMsg) && !isa(msg, IdentifySocketAckMsg) wait(w.initialized) end io = w.w_stream lock(io) try reset_state(w.w_serializer) serialize_hdr_raw(io, header) invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer write(io, MSG_BOUNDARY) if !now && w.gcflag flush_gc_msgs(w) else flush(io) end finally unlock(io) end end function flush_gc_msgs() try for w in (PGRP::ProcessGroup).workers if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag flush_gc_msgs(w) end end catch e bt = catch_backtrace() @async showerror(stderr, e, bt) end end function send_connection_hdr(w::Worker, cookie=true) # For a connection initiated from the remote side to us, we only send the version, # else when we initiate a connection we first send the cookie followed by our version. # The remote side validates the cookie. if cookie write(w.w_stream, LPROC.cookie) end write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN]) end N/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/process_messages.jl4# This file is a part of Julia. License is MIT: https://julialang.org/license # data stored by the owner of a remote reference def_rv_channel() = Channel(1) mutable struct RemoteValue c::AbstractChannel clientset::BitSet # Set of workerids that have a reference to this channel. # Keeping ids instead of a count aids in cleaning up upon # a worker exit. waitingfor::Int # processor we need to hear from to fill this, or 0 synctake::Union{ReentrantLock, Nothing} # A lock used to synchronize the # specific case of a local put! / remote take! on an # unbuffered store. github issue #29932 function RemoteValue(c) c_is_buffered = false try c_is_buffered = isbuffered(c) catch end if c_is_buffered return new(c, BitSet(), 0, nothing) else return new(c, BitSet(), 0, ReentrantLock()) end end end wait(rv::RemoteValue) = wait(rv.c) # A wrapper type to handle issue #29932 which requires locking / unlocking of # RemoteValue.synctake outside of lexical scope. struct SyncTake v::Any rv::RemoteValue end ## core messages: do, call, fetch, wait, ref, put! ## struct RemoteException <: Exception pid::Int captured::CapturedException end """ capture_exception(ex::RemoteException, bt) Returns `ex::RemoteException` which has already captured a backtrace (via it's [`CapturedException`](@ref) field `captured`). """ Base.capture_exception(ex::RemoteException, bt) = ex """ RemoteException(captured) Exceptions on remote computations are captured and rethrown locally. A `RemoteException` wraps the `pid` of the worker and a captured exception. A `CapturedException` captures the remote exception and a serializable form of the call stack when the exception was raised. """ RemoteException(captured) = RemoteException(myid(), captured) function showerror(io::IO, re::RemoteException) (re.pid != myid()) && print(io, "On worker ", re.pid, ":\n") showerror(io, re.captured) end function run_work_thunk(thunk::Function, print_error::Bool) local result try result = thunk() catch err ce = CapturedException(err, catch_backtrace()) result = RemoteException(ce) print_error && showerror(stderr, ce) end return result end function run_work_thunk(rv::RemoteValue, thunk) put!(rv, run_work_thunk(thunk, false)) nothing end function schedule_call(rid, thunk) return lock(client_refs) do rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) errormonitor(@async run_work_thunk(rv, thunk)) return rv end end function deliver_result(sock::IO, msg, oid, value) #print("$(myid()) sending result $oid\n") if msg === :call_fetch || isa(value, RemoteException) val = value else val = :OK end try send_msg_now(sock, MsgHeader(oid), ResultMsg(val)) catch e # terminate connection in case of serialization error # otherwise the reading end would hang @error "Fatal error on process $(myid())" exception=e,catch_backtrace() wid = worker_id_from_socket(sock) close(sock) if myid()==1 rmprocs(wid) elseif wid == 1 exit(1) else remote_do(rmprocs, 1, wid) end end end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) Sockets.nagle(r_stream, false) Sockets.quickack(r_stream, true) wait_connected(r_stream) if r_stream != w_stream Sockets.nagle(w_stream, false) Sockets.quickack(w_stream, true) wait_connected(w_stream) end message_handler_loop(r_stream, w_stream, incoming) end """ process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) Called by cluster managers using custom transports. It should be called when the custom transport implementation receives the first message from a remote worker. The custom transport must manage a logical connection to the remote worker and provide two `IO` objects, one for incoming messages and the other for messages addressed to the remote worker. If `incoming` is `true`, the remote peer initiated the connection. Whichever of the pair initiates the connection sends the cluster cookie and its Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) wpid=0 # the worker r_stream is connected to. boundary = similar(MSG_BOUNDARY) try version = process_hdr(r_stream, incoming) serializer = ClusterSerializer(r_stream) # The first message will associate wpid with r_stream header = deserialize_hdr_raw(r_stream) msg = deserialize_msg(serializer) handle_msg(msg, header, r_stream, w_stream, version) wpid = worker_id_from_socket(r_stream) @assert wpid > 0 readbytes!(r_stream, boundary, length(MSG_BOUNDARY)) while true reset_state(serializer) header = deserialize_hdr_raw(r_stream) # println("header: ", header) try msg = invokelatest(deserialize_msg, serializer) catch e # Deserialization error; discard bytes in stream until boundary found boundary_idx = 1 while true # This may throw an EOF error if the terminal boundary was not written # correctly, triggering the higher-scoped catch block below byte = read(r_stream, UInt8) if byte == MSG_BOUNDARY[boundary_idx] boundary_idx += 1 if boundary_idx > length(MSG_BOUNDARY) break end else boundary_idx = 1 end end # remotecalls only rethrow RemoteExceptions. Any other exception is treated as # data to be returned. Wrap this exception in a RemoteException. remote_err = RemoteException(myid(), CapturedException(e, catch_backtrace())) # println("Deserialization error. ", remote_err) if !null_id(header.response_oid) ref = lookup_ref(header.response_oid) put!(ref, remote_err) end if !null_id(header.notify_oid) deliver_result(w_stream, :call_fetch, header.notify_oid, remote_err) end continue end readbytes!(r_stream, boundary, length(MSG_BOUNDARY)) # println("got msg: ", typeof(msg)) handle_msg(msg, header, r_stream, w_stream, version) end catch e oldstate = W_UNKNOWN_STATE # Check again as it may have been set in a message handler but not propagated to the calling block above if wpid < 1 wpid = worker_id_from_socket(r_stream) end if wpid < 1 println(stderr, e, CapturedException(e, catch_backtrace())) println(stderr, "Process($(myid())) - Unknown remote, closing connection.") elseif !(wpid in map_del_wrkr) werr = worker_from_id(wpid) oldstate = werr.state set_worker_state(werr, W_TERMINATED) # If unhandleable error occurred talking to pid 1, exit if wpid == 1 if isopen(w_stream) @error "Fatal error on process $(myid())" exception=e,catch_backtrace() end exit(1) end # Will treat any exception as death of node and cleanup # since currently we do not have a mechanism for workers to reconnect # to each other on unhandled errors deregister_worker(wpid) end close(r_stream) close(w_stream) if (myid() == 1) && (wpid > 1) if oldstate != W_TERMINATING println(stderr, "Worker $wpid terminated.") rethrow() end end return nothing end end function process_hdr(s, validate_cookie) if validate_cookie cookie = read(s, HDR_COOKIE_LEN) if length(cookie) < HDR_COOKIE_LEN error("Cookie read failed. Connection closed by peer.") end self_cookie = cluster_cookie() for i in 1:HDR_COOKIE_LEN if UInt8(self_cookie[i]) != cookie[i] error("Process($(myid())) - Invalid connection credentials sent by remote.") end end end # When we have incompatible julia versions trying to connect to each other, # and can be detected, raise an appropriate error. # For now, just return the version. version = read(s, HDR_VERSION_LEN) if length(version) < HDR_VERSION_LEN error("Version read failed. Connection closed by peer.") end return VersionNumber(strip(String(version))) end function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) errormonitor(@async begin v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try deliver_result(w_stream, :call_fetch, header.notify_oid, v.v) finally unlock(v.rv.synctake) end else deliver_result(w_stream, :call_fetch, header.notify_oid, v) end nothing end) end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) errormonitor(@async begin rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) nothing end) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) put!(lookup_ref(header.response_oid), msg.value) end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) # register a new peer worker connection w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) end function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version) w = map_sock_wrkr[r_stream] w.version = version end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) LPROC.id = msg.self_pid controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) notify(controller.initialized) register_worker(LPROC) topology(msg.topology) if !msg.enable_threaded_blas Base.disable_library_threading() end lazy = msg.lazy PGRP.lazy = lazy @sync for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() wconfig.connect_at = connect_at let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else @async connect_to_peer(cluster_manager, rpid, wconfig) end end end send_connection_hdr(controller, false) send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid())) end function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) w = Worker(rpid, r_s, w_s, manager; config=wconfig) process_messages(w.r_stream, w.w_stream, false) send_connection_hdr(w, true) send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid())) notify(w.initialized) catch e @error "Error on $(myid()) while connecting to peer $rpid, exiting" exception=e,catch_backtrace() exit(1) end end function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) w = map_sock_wrkr[r_stream] environ = something(w.config.environ, Dict()) environ[:cpu_threads] = msg.cpu_threads w.config.environ = environ w.config.ospid = msg.ospid w.version = version ntfy_channel = lookup_ref(header.notify_oid) put!(ntfy_channel, w.id) push!(default_worker_pool(), w.id) end H/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:a# This file is a part of Julia. License is MIT: https://julialang.org/license import Base: eltype abstract type AbstractRemoteRef end """ client_refs Tracks whether a particular `AbstractRemoteRef` (identified by its RRID) exists on this worker. The `client_refs` lock is also used to synchronize access to `.refs` and associated `clientset` state. """ const client_refs = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet """ Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) A `Future` is a placeholder for a single computation of unknown termination status and time. For multiple potential computations, see `RemoteChannel`. See `remoteref_id` for identifying an `AbstractRemoteRef`. """ mutable struct Future <: AbstractRemoteRef where::Int whence::Int id::Int lock::ReentrantLock @atomic v::Union{Some{Any}, Nothing} Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = (r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r)) Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances end """ RemoteChannel(pid::Integer=myid()) Make a reference to a `Channel{Any}(1)` on process `pid`. The default `pid` is the current process. RemoteChannel(f::Function, pid::Integer=myid()) Create references to remote channels of a specific size and type. `f` is a function that when executed on `pid` must return an implementation of an `AbstractChannel`. For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a channel of type `Int` and size 10 on `pid`. The default `pid` is the current process. """ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef where::Int whence::Int id::Int function RemoteChannel{T}(w::Int, rrid::RRID) where T<:AbstractChannel r = new(w, rrid.whence, rrid.id) return test_existing_ref(r) end function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel return new(t[1],t[2],t[3]) end end function test_existing_ref(r::AbstractRemoteRef) found = getkey(client_refs, r, nothing) if found !== nothing @assert r.where > 0 if isa(r, Future) # this is only for copying the reference from Future to RemoteRef (just created) fv_cache = @atomic :acquire found.v rv_cache = @atomic :monotonic r.v if fv_cache === nothing && rv_cache !== nothing # we have recd the value from another source, probably a deserialized ref, send a del_client message send_del_client(r) @lock found.lock begin @atomicreplace found.v nothing => rv_cache end end end return found::typeof(r) end client_refs[r] = nothing finalizer(finalize_ref, r) return r end function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Handle the case of the finalizer having been called manually if trylock(client_refs.lock) # trylock doesn't call wait which causes yields try delete!(client_refs.ht, r) # direct removal avoiding locks if isa(r, RemoteChannel) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set v_cache = @atomic :monotonic r.v v_cache === nothing && send_del_client_no_lock(r) @atomic :monotonic r.v = nothing end r.where = 0 finally unlock(client_refs.lock) end else finalizer(finalize_ref, r) return nothing end end nothing end """ Future(pid::Integer=myid()) Create a `Future` on process `pid`. The default `pid` is the current process. """ Future(pid::Integer=myid()) = Future(pid, RRID()) Future(w::LocalProcess) = Future(w.id) Future(w::Worker) = Future(w.id) RemoteChannel(pid::Integer=myid()) = RemoteChannel{Channel{Any}}(pid, RRID()) function RemoteChannel(f::Function, pid::Integer=myid()) remotecall_fetch(pid, f, RRID()) do f, rrid rv=lookup_ref(rrid, f) RemoteChannel{typeof(rv.c)}(myid(), rrid) end end Base.eltype(::Type{RemoteChannel{T}}) where {T} = eltype(T) hash(r::AbstractRemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) ==(r::AbstractRemoteRef, s::AbstractRemoteRef) = (r.whence==s.whence && r.id==s.id) """ remoteref_id(r::AbstractRemoteRef) -> RRID `Future`s and `RemoteChannel`s are identified by fields: * `where` - refers to the node where the underlying object/storage referred to by the reference actually exists. * `whence` - refers to the node the remote reference was created from. Note that this is different from the node where the underlying object referred to actually exists. For example calling `RemoteChannel(2)` from the master process would result in a `where` value of 2 and a `whence` value of 1. * `id` is unique across all references created from the worker specified by `whence`. Taken together, `whence` and `id` uniquely identify a reference across all workers. `remoteref_id` is a low-level API which returns a `RRID` object that wraps `whence` and `id` values of a remote reference. """ remoteref_id(r::AbstractRemoteRef) = RRID(r.whence, r.id) """ channel_from_id(id) -> c A low-level API which returns the backing `AbstractChannel` for an `id` returned by [`remoteref_id`](@ref). The call is valid only on the node where the backing channel exists. """ function channel_from_id(id) rv = lock(client_refs) do return get(PGRP.refs, id, false) end if rv === false throw(ErrorException("Local instance of remote reference not found")) end return rv.c end lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f) function lookup_ref(pg, rrid, f) return lock(client_refs) do rv = get(pg.refs, rrid, false) if rv === false # first we've heard of this ref rv = RemoteValue(invokelatest(f)) pg.refs[rrid] = rv push!(rv.clientset, rrid.whence) end return rv end::RemoteValue end """ isready(rr::Future) Determine whether a [`Future`](@ref) has a value stored to it. If the argument `Future` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `rr` in a separate task instead or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """ function isready(rr::Future) v_cache = @atomic rr.v v_cache === nothing || return true rid = remoteref_id(rr) return if rr.where == myid() isready(lookup_ref(rid).c) else remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid) end end """ isready(rr::RemoteChannel, args...) Determine whether a [`RemoteChannel`](@ref) has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. However, it can be safely used on a [`Future`](@ref) since they are assigned only once. """ function isready(rr::RemoteChannel, args...) rid = remoteref_id(rr) return if rr.where == myid() isready(lookup_ref(rid).c, args...) else remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid) end end del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid()) del_client(id, client) = del_client(PGRP, id, client) function del_client(pg, id, client) lock(client_refs) do _del_client(pg, id, client) end nothing end function _del_client(pg, id, client) rv = get(pg.refs, id, false) if rv !== false delete!(rv.clientset, client) if isempty(rv.clientset) delete!(pg.refs, id) #print("$(myid()) collected $id\n") end end nothing end function del_clients(pairs::Vector) for p in pairs del_client(p[1], p[2]) end end # The task below is coalescing the `flush_gc_msgs` call # across multiple producers, see `send_del_client`, # and `send_add_client`. # XXX: Is this worth the additional complexity? # `flush_gc_msgs` has to iterate over all connected workers. const any_gc_flag = Threads.Condition() function start_gc_msgs_task() errormonitor( Threads.@spawn begin while true lock(any_gc_flag) do # this might miss events wait(any_gc_flag) end # Use invokelatest() so that custom message transport streams # for workers can be defined in a newer world age than the Task # which runs the loop here. invokelatest(flush_gc_msgs) # handles throws internally end end ) end # Function can be called within a finalizer function send_del_client(rr) if rr.where == myid() del_client(rr) elseif id_in_procs(rr.where) # process only if a valid worker process_worker(rr) end end function send_del_client_no_lock(rr) # for gc context to avoid yields if rr.where == myid() _del_client(PGRP, remoteref_id(rr), myid()) elseif id_in_procs(rr.where) # process only if a valid worker process_worker(rr) end end function publish_del_msg!(w::Worker, msg) lock(w.msg_lock) do push!(w.del_msgs, msg) @atomic w.gcflag = true end lock(any_gc_flag) do notify(any_gc_flag) end end function process_worker(rr) w = worker_from_id(rr.where)::Worker msg = (remoteref_id(rr), myid()) # Needs to acquire a lock on the del_msg queue T = Threads.@spawn begin publish_del_msg!($w, $msg) end Base.errormonitor(T) return end function add_client(id, client) lock(client_refs) do rv = lookup_ref(id) push!(rv.clientset, client) end nothing end function add_clients(pairs::Vector) for p in pairs add_client(p[1], p[2]...) end end function send_add_client(rr::AbstractRemoteRef, i) if rr.where == myid() add_client(remoteref_id(rr), i) elseif (i != rr.where) && id_in_procs(rr.where) # don't need to send add_client if the message is already going # to the processor that owns the remote ref. it will add_client # itself inside deserialize(). w = worker_from_id(rr.where) lock(w.msg_lock) do push!(w.add_msgs, (remoteref_id(rr), i)) @atomic w.gcflag = true end lock(any_gc_flag) do notify(any_gc_flag) end end end channel_type(rr::RemoteChannel{T}) where {T} = T function serialize(s::ClusterSerializer, f::Future) v_cache = @atomic f.v if v_cache === nothing p = worker_id_from_socket(s.io) (p !== f.where) && send_add_client(f, p) end invoke(serialize, Tuple{ClusterSerializer, Any}, s, f) end function serialize(s::ClusterSerializer, rr::RemoteChannel) p = worker_id_from_socket(s.io) (p !== rr.where) && send_add_client(rr, p) invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr) end function deserialize(s::ClusterSerializer, t::Type{<:Future}) fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table # 1) send_add_client() is not executed when the ref is being serialized # to where it exists, hence do it here. # 2) If we have received a 'fetch'ed Future or if the Future ctor found an # already 'fetch'ed instance in client_refs (Issue #25847), we should not # track it in the backing RemoteValue store. f2v_cache = @atomic f2.v if f2.where == myid() && f2v_cache === nothing add_client(remoteref_id(f2), myid()) end f2 end function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel}) rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) if rr.where == myid() # send_add_client() is not executed when the ref is being # serialized to where it exists add_client(remoteref_id(rr), myid()) end # call ctor to make sure this rr gets added to the client_refs table RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id)) end # Future and RemoteChannel are serializable only in a running cluster. # Serialize zeroed-out values to non ClusterSerializer objects function serialize(s::AbstractSerializer, ::Future) zero_fut = Future((0,0,0,nothing)) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut) end function serialize(s::AbstractSerializer, ::RemoteChannel) zero_rc = RemoteChannel{Channel{Any}}((0,0,0)) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc) end # make a thunk to call f on args in a way that simulates what would happen if # the function were sent elsewhere function local_remotecall_thunk(f, args, kwargs) return ()->invokelatest(f, args...; kwargs...) end function remotecall(f, w::LocalProcess, args...; kwargs...) rr = Future(w) schedule_call(remoteref_id(rr), local_remotecall_thunk(f, args, kwargs)) return rr end function remotecall(f, w::Worker, args...; kwargs...) rr = Future(w) send_msg(w, MsgHeader(remoteref_id(rr)), CallMsg{:call}(f, args, kwargs)) return rr end """ remotecall(f, id::Integer, args...; kwargs...) -> Future Call a function `f` asynchronously on the given arguments on the specified process. Return a [`Future`](@ref). Keyword arguments, if any, are passed through to `f`. """ remotecall(f, id::Integer, args...; kwargs...) = remotecall(f, worker_from_id(id), args...; kwargs...) function remotecall_fetch(f, w::LocalProcess, args...; kwargs...) v=run_work_thunk(local_remotecall_thunk(f,args, kwargs), false) return isa(v, RemoteException) ? throw(v) : v end function remotecall_fetch(f, w::Worker, args...; kwargs...) # can be weak, because the program will have no way to refer to the Ref # itself, it only gets the result. oid = RRID() rv = lookup_ref(oid) rv.waitingfor = w.id send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs)) v = take!(rv) lock(client_refs) do delete!(PGRP.refs, oid) end return isa(v, RemoteException) ? throw(v) : v end """ remotecall_fetch(f, id::Integer, args...; kwargs...) Perform `fetch(remotecall(...))` in one message. Keyword arguments, if any, are passed through to `f`. Any remote exceptions are captured in a [`RemoteException`](@ref) and thrown. See also [`fetch`](@ref) and [`remotecall`](@ref). # Examples ```julia-repl \$ julia -p 2 julia> remotecall_fetch(sqrt, 2, 4) 2.0 julia> remotecall_fetch(sqrt, 2, -4) ERROR: On worker 2: DomainError with -4.0: sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)). ... ``` """ remotecall_fetch(f, id::Integer, args...; kwargs...) = remotecall_fetch(f, worker_from_id(id), args...; kwargs...) remotecall_wait(f, w::LocalProcess, args...; kwargs...) = wait(remotecall(f, w, args...; kwargs...)) function remotecall_wait(f, w::Worker, args...; kwargs...) prid = RRID() rv = lookup_ref(prid) rv.waitingfor = w.id rr = Future(w) send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs)) v = fetch(rv.c) lock(client_refs) do delete!(PGRP.refs, prid) end isa(v, RemoteException) && throw(v) return rr end """ remotecall_wait(f, id::Integer, args...; kwargs...) Perform a faster `wait(remotecall(...))` in one message on the `Worker` specified by worker id `id`. Keyword arguments, if any, are passed through to `f`. See also [`wait`](@ref) and [`remotecall`](@ref). """ remotecall_wait(f, id::Integer, args...; kwargs...) = remotecall_wait(f, worker_from_id(id), args...; kwargs...) function remote_do(f, w::LocalProcess, args...; kwargs...) # the LocalProcess version just performs in local memory what a worker # does when it gets a :do message. # same for other messages on LocalProcess. thk = local_remotecall_thunk(f, args, kwargs) schedule(Task(thk)) nothing end function remote_do(f, w::Worker, args...; kwargs...) send_msg(w, MsgHeader(), RemoteDoMsg(f, args, kwargs)) nothing end """ remote_do(f, id::Integer, args...; kwargs...) -> nothing Executes `f` on worker `id` asynchronously. Unlike [`remotecall`](@ref), it does not store the result of computation, nor is there a way to wait for its completion. A successful invocation indicates that the request has been accepted for execution on the remote node. While consecutive `remotecall`s to the same worker are serialized in the order they are invoked, the order of executions on the remote worker is undetermined. For example, `remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)` will serialize the call to `f1`, followed by `f2` and `f3` in that order. However, it is not guaranteed that `f1` is executed before `f3` on worker 2. Any exceptions thrown by `f` are printed to [`stderr`](@ref) on the remote worker. Keyword arguments, if any, are passed through to `f`. """ remote_do(f, id::Integer, args...; kwargs...) = remote_do(f, worker_from_id(id), args...; kwargs...) # have the owner of rr call f on it function call_on_owner(f, rr::AbstractRemoteRef, args...) rid = remoteref_id(rr) if rr.where == myid() f(rid, args...) else remotecall_fetch(f, rr.where, rid, args...) end end function wait_ref(rid, caller, args...) v = fetch_ref(rid, args...) if isa(v, RemoteException) if myid() == caller throw(v) else return v end end nothing end """ wait(r::Future) Wait for a value to become available for the specified [`Future`](@ref). """ wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) """ wait(r::RemoteChannel, args...) Wait for a value to become available on the specified [`RemoteChannel`](@ref). """ wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r) """ fetch(x::Future) Wait for and get the value of a [`Future`](@ref). The fetched value is cached locally. Further calls to `fetch` on the same reference return the cached value. If the remote value is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) v_cache = @atomic r.v v_cache !== nothing && return something(v_cache) if r.where == myid() rv, v_cache = @lock r.lock begin v_cache = @atomic :monotonic r.v rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing rv, v_cache end if v_cache !== nothing return something(v_cache) else v_local = fetch(rv.c) end else v_local = call_on_owner(fetch_ref, r) end v_cache = @atomic r.v if v_cache === nothing # call_on_owner case v_old, status = @lock r.lock begin @atomicreplace r.v nothing => Some(v_local) end # status == true - when value obtained through call_on_owner # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated # why? local put! performs caching and putting into channel under r.lock # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v # remote calls getting the value from `call_on_owner` used to return the value directly without wrapping it in `Some(x)` # so we're doing the same thing here if status send_del_client(r) return v_local else # this `v_cache` is returned at the end of the function v_cache = v_old end end send_del_client(r) something(v_cache) end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) """ fetch(c::RemoteChannel) Wait for and get a value from a [`RemoteChannel`](@ref). Exceptions raised are the same as for a [`Future`](@ref). Does not remove the item fetched. """ fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...)::eltype(r) isready(rv::RemoteValue, args...) = isready(rv.c, args...) """ put!(rr::Future, v) Store a value to a [`Future`](@ref) `rr`. `Future`s are write-once remote references. A `put!` on an already set `Future` throws an `Exception`. All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ function put!(r::Future, v) if r.where == myid() rid = remoteref_id(r) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") @lock r.lock begin put!(rv, v) # this notifies the tasks waiting on the channel in fetch set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached end del_client(rid, myid()) else @lock r.lock begin # same idea as above if there were any local tasks fetching on this Future call_on_owner(put_future, r, v, myid()) set_future_cache(r, v) end end r end function set_future_cache(r::Future, v) _, ok = @atomicreplace r.v nothing => Some(v) ok || error("internal consistency error detected for Future") end function put_future(rid, v, caller) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") put!(rv, v) # The caller has the value and hence can be removed from the remote store. del_client(rid, caller) nothing end put!(rv::RemoteValue, args...) = put!(rv.c, args...) function put_ref(rid, caller, args...) rv = lookup_ref(rid) put!(rv, args...) if myid() == caller && rv.synctake !== nothing # Wait till a "taken" value is serialized out - github issue #29932 lock(rv.synctake) unlock(rv.synctake) end nothing end """ put!(rr::RemoteChannel, args...) Store a set of values to the [`RemoteChannel`](@ref). If the channel is full, blocks until space is available. Return the first argument. """ put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, myid(), args...); rr) # take! is not supported on Future take!(rv::RemoteValue, args...) = take!(rv.c, args...) function take_ref(rid, caller, args...) rv = lookup_ref(rid) synctake = false if myid() != caller && rv.synctake !== nothing # special handling for local put! / remote take! on unbuffered channel # github issue #29932 synctake = true lock(rv.synctake) end v = try take!(rv, args...) catch e # avoid unmatched unlock when exception occurs # github issue #33972 synctake && unlock(rv.synctake) rethrow(e) end isa(v, RemoteException) && (myid() == caller) && throw(v) if synctake return SyncTake(v, rv) else return v end end """ take!(rr::RemoteChannel, args...) Fetch value(s) from a [`RemoteChannel`](@ref) `rr`, removing the value(s) in the process. """ take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...)::eltype(rr) # close and isopen are not supported on Future close_ref(rid) = (close(lookup_ref(rid).c); nothing) close(rr::RemoteChannel) = call_on_owner(close_ref, rr) isopen_ref(rid) = isopen(lookup_ref(rid).c) isopen(rr::RemoteChannel) = call_on_owner(isopen_ref, rr) getindex(r::RemoteChannel) = fetch(r) getindex(r::Future) = fetch(r) getindex(r::Future, args...) = getindex(fetch(r), args...) function getindex(r::RemoteChannel, args...) if r.where == myid() return getindex(fetch(r), args...) end return remotecall_fetch(getindex, r.where, r, args...) end function iterate(c::RemoteChannel, state=nothing) if isopen(c) || isready(c) try return (take!(c), nothing) catch e if isa(e, InvalidStateException) || (isa(e, RemoteException) && isa(e.captured.ex, InvalidStateException) && e.captured.ex.state === :closed) return nothing end rethrow() end else return nothing end end IteratorSize(::Type{<:RemoteChannel}) = SizeUnknown() D/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/macros.jlj## This file is a part of Julia. License is MIT: https://julialang.org/license let nextidx = Threads.Atomic{Int}(0) global nextproc function nextproc() idx = Threads.atomic_add!(nextidx, 1) return workers()[(idx % nworkers()) + 1] end end spawnat(p, thunk) = remotecall(thunk, p) spawn_somewhere(thunk) = spawnat(nextproc(),thunk) """ @spawn expr Create a closure around an expression and run it on an automatically-chosen process, returning a [`Future`](@ref) to the result. This macro is deprecated; `@spawnat :any expr` should be used instead. # Examples ```julia-repl julia> addprocs(3); julia> f = @spawn myid() Future(2, 1, 5, nothing) julia> fetch(f) 2 julia> f = @spawn myid() Future(3, 1, 7, nothing) julia> fetch(f) 3 ``` !!! compat "Julia 1.3" As of Julia 1.3 this macro is deprecated. Use `@spawnat :any` instead. """ macro spawn(expr) thunk = esc(:(()->($expr))) var = esc(Base.sync_varname) quote local ref = spawn_somewhere($thunk) if $(Expr(:islocal, var)) put!($var, ref) end ref end end """ @spawnat p expr Create a closure around an expression and run the closure asynchronously on process `p`. Return a [`Future`](@ref) to the result. If `p` is the quoted literal symbol `:any`, then the system will pick a processor to use automatically. # Examples ```julia-repl julia> addprocs(3); julia> f = @spawnat 2 myid() Future(2, 1, 3, nothing) julia> fetch(f) 2 julia> f = @spawnat :any myid() Future(3, 1, 7, nothing) julia> fetch(f) 3 ``` !!! compat "Julia 1.3" The `:any` argument is available as of Julia 1.3. """ macro spawnat(p, expr) thunk = esc(:(()->($expr))) var = esc(Base.sync_varname) if p === QuoteNode(:any) spawncall = :(spawn_somewhere($thunk)) else spawncall = :(spawnat($(esc(p)), $thunk)) end quote local ref = $spawncall if $(Expr(:islocal, var)) put!($var, ref) end ref end end """ @fetch expr Equivalent to `fetch(@spawnat :any expr)`. See [`fetch`](@ref) and [`@spawnat`](@ref). # Examples ```julia-repl julia> addprocs(3); julia> @fetch myid() 2 julia> @fetch myid() 3 julia> @fetch myid() 4 julia> @fetch myid() 2 ``` """ macro fetch(expr) thunk = esc(:(()->($expr))) :(remotecall_fetch($thunk, nextproc())) end """ @fetchfrom Equivalent to `fetch(@spawnat p expr)`. See [`fetch`](@ref) and [`@spawnat`](@ref). # Examples ```julia-repl julia> addprocs(3); julia> @fetchfrom 2 myid() 2 julia> @fetchfrom 4 myid() 4 ``` """ macro fetchfrom(p, expr) thunk = esc(:(()->($expr))) :(remotecall_fetch($thunk, $(esc(p)))) end # extract a list of modules to import from an expression extract_imports!(imports, x) = imports function extract_imports!(imports, ex::Expr) if Meta.isexpr(ex, (:import, :using)) push!(imports, ex) elseif Meta.isexpr(ex, :let) extract_imports!(imports, ex.args[2]) elseif Meta.isexpr(ex, (:toplevel, :block)) for arg in ex.args extract_imports!(imports, arg) end end return imports end extract_imports(x) = extract_imports!(Any[], x) """ @everywhere [procs()] expr Execute an expression under `Main` on all `procs`. Errors on any of the processes are collected into a [`CompositeException`](@ref) and thrown. For example: @everywhere bar = 1 will define `Main.bar` on all current processes. Any processes added later (say with [`addprocs()`](@ref)) will not have the expression defined. Unlike [`@spawnat`](@ref), `@everywhere` does not capture any local variables. Instead, local variables can be broadcast using interpolation: foo = 1 @everywhere bar = \$foo The optional argument `procs` allows specifying a subset of all processes to have execute the expression. Similar to calling `remotecall_eval(Main, procs, expr)`, but with two extra features: - `using` and `import` statements run on the calling process first, to ensure packages are precompiled. - The current source file path used by `include` is propagated to other processes. """ macro everywhere(ex) procs = GlobalRef(@__MODULE__, :procs) return esc(:($(Distributed).@everywhere $procs() $ex)) end macro everywhere(procs, ex) imps = extract_imports(ex) return quote $(isempty(imps) ? nothing : Expr(:toplevel, imps...)) # run imports locally first let ex = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), $(esc(Expr(:quote, ex)))), procs = $(esc(procs)) remotecall_eval(Main, procs, ex) end end end """ remotecall_eval(m::Module, procs, expression) Execute an expression under module `m` on the processes specified in `procs`. Errors on any of the processes are collected into a [`CompositeException`](@ref) and thrown. See also [`@everywhere`](@ref). """ function remotecall_eval(m::Module, procs, ex) @sync begin run_locally = 0 for pid in procs if pid == myid() run_locally += 1 else @async_unwrap remotecall_wait(Core.eval, pid, m, ex) end end yield() # ensure that the remotecalls have had a chance to start # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally @async Core.eval(m, ex) end end nothing end # optimized version of remotecall_eval for a single pid # and which also fetches the return value function remotecall_eval(m::Module, pid::Int, ex) return remotecall_fetch(Core.eval, pid, m, ex) end # Statically split range [firstIndex,lastIndex] into equal sized chunks for np processors function splitrange(firstIndex::Int, lastIndex::Int, np::Int) each, extras = divrem(lastIndex-firstIndex+1, np) nchunks = each > 0 ? np : extras chunks = Vector{UnitRange{Int}}(undef, nchunks) lo = firstIndex for i in 1:nchunks hi = lo + each - 1 if extras > 0 hi += 1 extras -= 1 end chunks[i] = lo:hi lo = hi+1 end return chunks end function preduce(reducer, f, R) chunks = splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) all_w = workers()[1:length(chunks)] w_exec = Task[] for (idx,pid) in enumerate(all_w) t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx]))) schedule(t) push!(w_exec, t) end reduce(reducer, Any[fetch(t) for t in w_exec]) end function pfor(f, R) t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) end function make_preduce_body(var, body) quote function (reducer, R, lo::Int, hi::Int) $(esc(var)) = R[lo] ac = $(esc(body)) if lo != hi for $(esc(var)) in R[(lo+1):hi] ac = reducer(ac, $(esc(body))) end end ac end end end function make_pfor_body(var, body) quote function (R, lo::Int, hi::Int) for $(esc(var)) in R[lo:hi] $(esc(body)) end end end end """ @distributed A distributed memory, parallel for loop of the form : @distributed [reducer] for var = range body end The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified, `@distributed` performs local reductions on each worker with a final reduction on the calling process. Note that without a reducer function, `@distributed` executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with [`@sync`](@ref), like : @sync @distributed for var = range body end """ macro distributed(args...) na = length(args) if na==1 loop = args[1] elseif na==2 reducer = args[1] loop = args[2] else throw(ArgumentError("wrong number of arguments to @distributed")) end if !isa(loop,Expr) || loop.head !== :for error("malformed @distributed loop") end var = loop.args[1].args[1] r = loop.args[1].args[2] body = loop.args[2] if Meta.isexpr(body, :block) && body.args[end] isa LineNumberNode resize!(body.args, length(body.args) - 1) end if na==1 syncvar = esc(Base.sync_varname) return quote local ref = pfor($(make_pfor_body(var, body)), $(esc(r))) if $(Expr(:islocal, syncvar)) put!($syncvar, ref) end ref end else return :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r)))) end end H/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/workerpool.jlL-# This file is a part of Julia. License is MIT: https://julialang.org/license """ AbstractWorkerPool Supertype for worker pools such as [`WorkerPool`](@ref) and [`CachingPool`](@ref). An `AbstractWorkerPool` should implement: - [`push!`](@ref) - add a new worker to the overall pool (available + busy) - [`put!`](@ref) - put back a worker to the available pool - [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution) - [`length`](@ref) - number of workers available in the overall pool - [`isready`](@ref) - return false if a `take!` on the pool would block, else true The default implementations of the above (on a `AbstractWorkerPool`) require fields - `channel::Channel{Int}` - `workers::Set{Int}` where `channel` contains free worker pids and `workers` is the set of all workers associated with this pool. """ abstract type AbstractWorkerPool end mutable struct WorkerPool <: AbstractWorkerPool channel::Channel{Int} workers::Set{Int} ref::RemoteChannel WorkerPool(c::Channel, ref::RemoteChannel) = new(c, Set{Int}(), ref) end function WorkerPool() wp = WorkerPool(Channel{Int}(typemax(Int)), RemoteChannel()) put!(wp.ref, WeakRef(wp)) wp end """ WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}}) Create a `WorkerPool` from a vector or range of worker ids. # Examples ```julia-repl \$ julia -p 3 julia> WorkerPool([2, 3]) WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6)) julia> WorkerPool(2:4) WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7)) ``` """ function WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}}) pool = WorkerPool() foreach(w->push!(pool, w), workers) return pool end # On workers where this pool has been serialized to, instantiate with a dummy local channel. WorkerPool(ref::RemoteChannel) = WorkerPool(Channel{Int}(1), ref) function serialize(S::AbstractSerializer, pool::WorkerPool) # Allow accessing a worker pool from other processors. When serialized, # initialize the `ref` to point to self and only send the ref. # Other workers will forward all put!, take!, calls to the process owning # the ref (and hence the pool). Serialization.serialize_type(S, typeof(pool)) serialize(S, pool.ref) end deserialize(S::AbstractSerializer, t::Type{T}) where {T<:WorkerPool} = T(deserialize(S)) wp_local_push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool) wp_local_length(pool::AbstractWorkerPool) = length(pool.workers) wp_local_isready(pool::AbstractWorkerPool) = isready(pool.channel) function wp_local_put!(pool::AbstractWorkerPool, w::Int) # In case of default_worker_pool, the master is implicitly considered a worker, i.e., # it is not present in pool.workers. # Confirm the that the worker is part of a pool before making it available. w in pool.workers && put!(pool.channel, w) w end function wp_local_workers(pool::AbstractWorkerPool) if length(pool) == 0 && pool === default_worker_pool() return [1] else return collect(pool.workers) end end function wp_local_nworkers(pool::AbstractWorkerPool) if length(pool) == 0 && pool === default_worker_pool() return 1 else return length(pool.workers) end end function wp_local_take!(pool::AbstractWorkerPool) # Find an active worker worker = 0 while true if length(pool) == 0 if pool === default_worker_pool() # No workers, the master process is used as a worker worker = 1 break else throw(ErrorException("No active worker available in pool")) end end worker = take!(pool.channel) if id_in_procs(worker) break else delete!(pool.workers, worker) # Remove invalid worker from pool end end return worker end function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...) worker = take!(pool) try rc_f(f, worker, args...; kwargs...) finally put!(pool, worker) end end # Check if pool is local or remote and forward calls if required. # NOTE: remotecall_fetch does it automatically, but this will be more efficient as # it avoids the overhead associated with a local remotecall. for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int)) func_local = Symbol(string("wp_local_", func)) @eval begin function ($func)(pool::WorkerPool) if pool.ref.where != myid() return remotecall_fetch(ref->($func_local)(fetch(ref).value), pool.ref.where, pool.ref)::$rt else return ($func_local)(pool) end end # default impl ($func)(pool::AbstractWorkerPool) = ($func_local)(pool) end end for func = (:push!, :put!) func_local = Symbol(string("wp_local_", func)) @eval begin function ($func)(pool::WorkerPool, w::Int) if pool.ref.where != myid() return remotecall_fetch((ref, w)->($func_local)(fetch(ref).value, w), pool.ref.where, pool.ref, w) else return ($func_local)(pool, w) end end # default impl ($func)(pool::AbstractWorkerPool, w::Int) = ($func_local)(pool, w) end end """ remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future [`WorkerPool`](@ref) variant of `remotecall(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remotecall` on it. # Examples ```julia-repl \$ julia -p 3 julia> wp = WorkerPool([2, 3]); julia> A = rand(3000); julia> f = remotecall(maximum, wp, A) Future(2, 1, 6, nothing) ``` In this example, the task ran on pid 2, called from pid 1. """ remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall, f, pool, args...; kwargs...) """ remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future [`WorkerPool`](@ref) variant of `remotecall_wait(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remotecall_wait` on it. # Examples ```julia-repl \$ julia -p 3 julia> wp = WorkerPool([2, 3]); julia> A = rand(3000); julia> f = remotecall_wait(maximum, wp, A) Future(3, 1, 9, nothing) julia> fetch(f) 0.9995177101692958 ``` """ remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_wait, f, pool, args...; kwargs...) """ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result [`WorkerPool`](@ref) variant of `remotecall_fetch(f, pid, ....)`. Waits for and takes a free worker from `pool` and performs a `remotecall_fetch` on it. # Examples ```julia-repl \$ julia -p 3 julia> wp = WorkerPool([2, 3]); julia> A = rand(3000); julia> remotecall_fetch(maximum, wp, A) 0.9995177101692958 ``` """ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_fetch, f, pool, args...; kwargs...) """ remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing [`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remote_do` on it. """ remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...) const _default_worker_pool = Ref{Union{AbstractWorkerPool, Nothing}}(nothing) """ default_worker_pool() [`AbstractWorkerPool`](@ref) containing idle [`workers`](@ref) - used by `remote(f)` and [`pmap`](@ref) (by default). Unless one is explicitly set via `default_worker_pool!(pool)`, the default worker pool is initialized to a [`WorkerPool`](@ref). # Examples ```julia-repl \$ julia -p 3 julia> default_worker_pool() WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4)) ``` """ function default_worker_pool() # On workers retrieve the default worker pool from the master when accessed # for the first time if _default_worker_pool[] === nothing if myid() == 1 _default_worker_pool[] = WorkerPool() else _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end return _default_worker_pool[] end """ default_worker_pool!(pool::AbstractWorkerPool) Set a [`AbstractWorkerPool`](@ref) to be used by `remote(f)` and [`pmap`](@ref) (by default). """ function default_worker_pool!(pool::AbstractWorkerPool) _default_worker_pool[] = pool end """ remote([p::AbstractWorkerPool], f) -> Function Return an anonymous function that executes function `f` on an available worker (drawn from [`WorkerPool`](@ref) `p` if provided) using [`remotecall_fetch`](@ref). """ remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...) remote(p::AbstractWorkerPool, f) = (args...; kwargs...)->remotecall_fetch(f, p, args...; kwargs...) mutable struct CachingPool <: AbstractWorkerPool channel::Channel{Int} workers::Set{Int} # Mapping between a tuple (worker_id, f) and a RemoteChannel map_obj2ref::IdDict{Tuple{Int, Function}, RemoteChannel} function CachingPool() wp = new(Channel{Int}(typemax(Int)), Set{Int}(), IdDict{Tuple{Int, Function}, RemoteChannel}()) finalizer(clear!, wp) wp end end serialize(s::AbstractSerializer, cp::CachingPool) = throw(ErrorException("CachingPool objects are not serializable.")) """ CachingPool(workers::Vector{Int}) An implementation of an `AbstractWorkerPool`. [`remote`](@ref), [`remotecall_fetch`](@ref), [`pmap`](@ref) (and other remote calls which execute functions remotely) benefit from caching the serialized/deserialized functions on the worker nodes, especially closures (which may capture large amounts of data). The remote cache is maintained for the lifetime of the returned `CachingPool` object. To clear the cache earlier, use `clear!(pool)`. For global variables, only the bindings are captured in a closure, not the data. `let` blocks can be used to capture global data. # Examples ```julia const foo = rand(10^8); wp = CachingPool(workers()) let foo = foo pmap(i -> sum(foo) + i, wp, 1:100); end ``` The above would transfer `foo` only once to each worker. """ function CachingPool(workers::Vector{Int}) pool = CachingPool() for w in workers push!(pool, w) end return pool end """ clear!(pool::CachingPool) -> pool Removes all cached functions from all participating workers. """ function clear!(pool::CachingPool) for (_,rr) in pool.map_obj2ref finalize(rr) end empty!(pool.map_obj2ref) pool end exec_from_cache(rr::RemoteChannel, args...; kwargs...) = fetch(rr)(args...; kwargs...) function exec_from_cache(f_ref::Tuple{Function, RemoteChannel}, args...; kwargs...) put!(f_ref[2], f_ref[1]) # Cache locally f_ref[1](args...; kwargs...) end function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...) worker = take!(pool) f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker))) isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker try rc_f(exec_from_cache, worker, f_ref, args...; kwargs...) finally put!(pool, worker) end end B/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/pmap.jlV(# This file is a part of Julia. License is MIT: https://julialang.org/license struct BatchProcessingError <: Exception data ex end """ pgenerate([::AbstractWorkerPool], f, c...) -> iterator Apply `f` to each element of `c` in parallel using available workers and tasks. For multiple collection arguments, apply `f` elementwise. Results are returned in order as they become available. Note that `f` must be made available to all worker processes; see [Code Availability and Loading Packages](@ref code-availability) for details. """ function pgenerate(p::AbstractWorkerPool, f, c) if length(p) == 0 return AsyncGenerator(f, c; ntasks=()->nworkers(p)) end batches = batchsplit(c, min_batch_count = length(p) * 3) return Iterators.flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches)) end pgenerate(p::AbstractWorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...)) pgenerate(f, c) = pgenerate(default_worker_pool(), f, c) pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...)) """ pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection Transform collection `c` by applying `f` to each element using available workers and tasks. For multiple collection arguments, apply `f` elementwise. Note that `f` must be made available to all worker processes; see [Code Availability and Loading Packages](@ref code-availability) for details. If a worker pool is not specified, all available workers, i.e., the default worker pool is used. By default, `pmap` distributes the computation over all specified workers. To use only the local process and distribute over tasks, specify `distributed=false`. This is equivalent to using [`asyncmap`](@ref). For example, `pmap(f, c; distributed=false)` is equivalent to `asyncmap(f,c; ntasks=()->nworkers())` `pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes greater than 1, the collection is processed in multiple batches, each of length `batch_size` or less. A batch is sent as a single request to a free worker, where a local [`asyncmap`](@ref) processes elements from the batch using multiple concurrent tasks. Any error stops `pmap` from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller. Consider the following two examples. The first one returns the exception object inline, the second a 0 in place of any exception: ```julia-repl julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity) 4-element Array{Any,1}: 1 ErrorException("foo") 3 ErrorException("foo") julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0) 4-element Array{Int64,1}: 1 0 3 0 ``` Errors can also be handled by retrying failed computations. Keyword arguments `retry_delays` and `retry_check` are passed through to [`retry`](@ref) as keyword arguments `delays` and `check` respectively. If batching is specified, and an entire batch fails, all items in the batch are retried. Note that if both `on_error` and `retry_delays` are specified, the `on_error` hook is called before retrying. If `on_error` does not throw (or rethrow) an exception, the element will not be retried. Example: On errors, retry `f` on an element a maximum of 3 times without any delay between retries. ```julia pmap(f, c; retry_delays = zeros(3)) ``` Example: Retry `f` only if the exception is not of type [`InexactError`](@ref), with exponentially increasing delays up to 3 times. Return a `NaN` in place for all `InexactError` occurrences. ```julia pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3)) ``` """ function pmap(f, p::AbstractWorkerPool, c; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) f_orig = f # Don't do remote calls if there are no workers. if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid()) distributed = false end # Don't do batching if not doing remote calls. if !distributed batch_size = 1 end # If not batching, do simple remote call. if batch_size == 1 if on_error !== nothing f = wrap_on_error(f, on_error) end if distributed f = remote(p, f) end if length(retry_delays) > 0 f = wrap_retry(f, retry_delays, retry_check) end return asyncmap(f, c; ntasks=()->nworkers(p)) else # During batch processing, We need to ensure that if on_error is set, it is called # for each element in error, and that we return as many elements as the original list. # retry, if set, has to be called element wise and we will do a best-effort # to ensure that we do not call mapped function on the same element more than length(retry_delays). # This guarantee is not possible in case of worker death / network errors, wherein # we will retry the entire batch on a new worker. handle_errors = ((on_error !== nothing) || (length(retry_delays) > 0)) # Unlike the non-batch case, in batch mode, we trap all errors and the on_error hook (if present) # is processed later in non-batch mode. if handle_errors f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) end f = wrap_batch(f, p, handle_errors) results = asyncmap(f, c; ntasks=()->nworkers(p), batch_size=batch_size) # process errors if any. if handle_errors process_batch_errors!(p, f_orig, results, on_error, retry_delays, retry_check) end return results end end pmap(f, p::AbstractWorkerPool, c1, c...; kwargs...) = pmap(a->f(a...), p, zip(c1, c...); kwargs...) pmap(f, c; kwargs...) = pmap(f, default_worker_pool(), c; kwargs...) pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...) function wrap_on_error(f, on_error; capture_data=false) return x -> begin try f(x) catch e if capture_data on_error(x, e) else on_error(e) end end end end function wrap_retry(f, retry_delays, retry_check) retry(delays=retry_delays, check=retry_check) do x try f(x) catch e rethrow(extract_exception(e)) end end end function wrap_batch(f, p, handle_errors) f = asyncmap_batch(f) return batch -> begin try remotecall_fetch(f, p, batch) catch e if handle_errors return Any[BatchProcessingError(b, e) for b in batch] else rethrow() end end end end asyncmap_batch(f) = batch -> asyncmap(x->f(x...), batch) extract_exception(e) = isa(e, RemoteException) ? e.captured.ex : e function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check) # Handle all the ones in error in another pmap, with batch size set to 1 reprocess = Tuple{Int,BatchProcessingError}[] for (idx, v) in enumerate(results) if isa(v, BatchProcessingError) push!(reprocess, (idx,v)) end end if length(reprocess) > 0 errors = [x[2] for x in reprocess] exceptions = Any[x.ex for x in errors] state = iterate(retry_delays) state !== nothing && (state = state[2]) error_processed = let state=state if (length(retry_delays)::Int > 0) && (retry_check === nothing || all([retry_check(state,ex)[2] for ex in exceptions])) # BatchProcessingError.data is a tuple of original args pmap(x->f(x...), p, Any[x.data for x in errors]; on_error = on_error, retry_delays = collect(retry_delays)[2:end::Int], retry_check = retry_check) elseif on_error !== nothing map(on_error, exceptions) else throw(CompositeException(exceptions)) end end for (idx, v) in enumerate(error_processed) results[reprocess[idx][1]] = v end end nothing end """ head_and_tail(c, n) -> head, tail Return `head`: the first `n` elements of `c`; and `tail`: an iterator over the remaining elements. ```jldoctest julia> b, c = Distributed.head_and_tail(1:10, 3) ([1, 2, 3], Base.Iterators.Rest{UnitRange{Int64}, Int64}(1:10, 3)) julia> collect(c) 7-element Vector{Int64}: 4 5 6 7 8 9 10 ``` """ function head_and_tail(c, n) head = Vector{eltype(c)}(undef, n) n == 0 && return (head, c) i = 1 y = iterate(c) y === nothing && return (resize!(head, 0), ()) head[i] = y[1] while i < n y = iterate(c, y[2]) y === nothing && return (resize!(head, i), ()) i += 1 head[i] = y[1] end return head, Iterators.rest(c, y[2]) end """ batchsplit(c; min_batch_count=1, max_batch_size=100) -> iterator Split a collection into at least `min_batch_count` batches. Equivalent to `partition(c, max_batch_size)` when `length(c) >> max_batch_size`. """ function batchsplit(c; min_batch_count=1, max_batch_size=100) if min_batch_count < 1 throw(ArgumentError("min_batch_count must be ≥ 1, got $min_batch_count")) end if max_batch_size < 1 throw(ArgumentError("max_batch_size must be ≥ 1, got $max_batch_size")) end # Split collection into batches, then peek at the first few batches batches = Iterators.partition(c, max_batch_size) head, tail = head_and_tail(batches, min_batch_count) # If there are not enough batches, use a smaller batch size if length(head) < min_batch_count batch_size = max(1, div(sum(length, head), min_batch_count)) return Iterators.partition(collect(Iterators.flatten(head)), batch_size) end return Iterators.flatten((head, tail)) end F/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/managers.jlm# This file is a part of Julia. License is MIT: https://julialang.org/license # Built-in SSH and Local Managers struct SSHManager <: ClusterManager machines::Dict function SSHManager(machines) # machines => array of machine elements # machine => address or (address, cnt) # address => string of form `[user@]host[:port] bind_addr[:bind_port]` # cnt => :auto or number # :auto launches NUM_CORES number of workers at address # number launches the specified number of workers at address mhist = Dict() for m in machines if isa(m, Tuple) host=m[1] cnt=m[2] else host=m cnt=1 end current_cnt = get(mhist, host, 0) if isa(cnt, Number) mhist[host] = isa(current_cnt, Number) ? current_cnt + Int(cnt) : Int(cnt) else mhist[host] = cnt end end new(mhist) end end function check_addprocs_args(manager, kwargs) valid_kw_names = keys(default_addprocs_params(manager)) for keyname in keys(kwargs) !(keyname in valid_kw_names) && throw(ArgumentError("Invalid keyword argument $(keyname)")) end end # SSHManager # start and connect to processes via SSH, optionally through an SSH tunnel. # the tunnel is only used from the head (process 1); the nodes are assumed # to be mutually reachable without a tunnel, as is often the case in a cluster. # Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config # A machine is either a or a tuple of (, count) """ addprocs(machines; tunnel=false, sshflags=\`\`, max_parallel=10, kwargs...) -> List of process identifiers Add worker processes on remote machines via SSH. Configuration is done with keyword arguments (see below). In particular, the `exename` keyword can be used to specify the path to the `julia` binary on the remote machine(s). `machines` is a vector of "machine specifications" which are given as strings of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port` to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect to this worker at the specified `bind_addr` and `port`. It is possible to launch multiple processes on a remote host by using a tuple in the `machines` vector or the form `(machine_spec, count)`, where `count` is the number of workers to be launched on the specified host. Passing `:auto` as the worker count will launch as many workers as the number of CPU threads on the remote host. **Examples**: ```julia addprocs([ "remote1", # one worker on 'remote1' logging in with the current username "user@remote2", # one worker on 'remote2' logging in with the 'user' username "user@remote3:2222", # specifying SSH port to '2222' for 'remote3' ("user@remote4", 4), # launch 4 workers on 'remote4' ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5' ]) ``` **Keyword arguments**: * `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the master process. Default is `false`. * `multiplex`: if `true` then SSH multiplexing is used for SSH tunneling. Default is `false`. * `ssh`: the name or path of the SSH client executable used to start the workers. Default is `"ssh"`. * `sshflags`: specifies additional ssh options, e.g. ``` sshflags=\`-i /home/foo/bar.pem\` ``` * `max_parallel`: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10. * `shell`: specifies the type of shell to which ssh connects on the workers. + `shell=:posix`: a POSIX-compatible Unix/Linux shell (sh, ksh, bash, dash, zsh, etc.). The default. + `shell=:csh`: a Unix C shell (csh, tcsh). + `shell=:wincmd`: Microsoft Windows `cmd.exe`. * `dir`: specifies the working directory on the workers. Defaults to the host's current directory (as found by `pwd()`) * `enable_threaded_blas`: if `true` then BLAS will run on multiple threads in added processes. Default is `false`. * `exename`: name of the `julia` executable. Defaults to `"\$(Sys.BINDIR)/julia"` or `"\$(Sys.BINDIR)/julia-debug"` as the case may be. It is recommended that a common Julia version is used on all remote machines because serialization and code distribution might fail otherwise. * `exeflags`: additional flags passed to the worker processes. * `topology`: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error. + `topology=:all_to_all`: All processes are connected to each other. The default. + `topology=:master_worker`: Only the driver process, i.e. `pid` 1 connects to the workers. The workers do not connect to each other. + `topology=:custom`: The `launch` method of the cluster manager specifies the connection topology via fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster manager identity `ident` will connect to all workers specified in `connect_idents`. * `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections are setup lazily, i.e. they are setup at the first instance of a remote call between workers. Default is true. * `env`: provide an array of string pairs such as `env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables are set on the remote machine. By default only the environment variable `JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote environment. * `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline option. The (more secure) default behaviour of passing the cookie via ssh stdio may hang with Windows workers that use older (pre-ConPTY) Julia or Windows versions, in which case `cmdline_cookie=true` offers a work-around. !!! compat "Julia 1.6" The keyword arguments `ssh`, `shell`, `env` and `cmdline_cookie` were added in Julia 1.6. Environment variables: If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it as a fatal situation and terminates. This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`. The value of `JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a newly launched worker waits for connection establishment. """ function addprocs(machines::AbstractVector; kwargs...) manager = SSHManager(machines) check_addprocs_args(manager, kwargs) addprocs(manager; kwargs...) end default_addprocs_params(::SSHManager) = merge(default_addprocs_params(), Dict{Symbol,Any}( :ssh => "ssh", :sshflags => ``, :shell => :posix, :cmdline_cookie => false, :env => [], :tunnel => false, :multiplex => false, :max_parallel => 10)) function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition) # Launch one worker on each unique host in parallel. Additional workers are launched later. # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt @async try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") end end end notify(launch_ntfy) end Base.show(io::IO, manager::SSHManager) = print(io, "SSHManager(machines=", manager.machines, ")") function parse_machine(machine::AbstractString) hoststr = "" portnum = nothing if machine[begin] == '[' # ipv6 bracket notation (RFC 2732) ipv6_end = findlast(']', machine) if ipv6_end === nothing throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine\"")) end hoststr = machine[begin+1 : prevind(machine,ipv6_end)] machine_def = split(machine[ipv6_end : end] , ':') else # ipv4 machine_def = split(machine, ':') hoststr = machine_def[1] end if length(machine_def) > 2 throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine_def\"")) end if length(machine_def) == 2 portstr = machine_def[2] portnum = tryparse(Int, portstr) if portnum === nothing msg = "invalid machine definition format string: invalid port format \"$machine_def\"" throw(ArgumentError(msg)) end if portnum < 1 || portnum > 65535 msg = "invalid machine definition format string: invalid port number \"$machine_def\"" throw(ArgumentError(msg)) end end (hoststr, portnum) end function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, params::Dict, launched::Array, launch_ntfy::Condition) shell = params[:shell] ssh = params[:ssh] dir = params[:dir] exename = params[:exename] exeflags = params[:exeflags] tunnel = params[:tunnel] multiplex = params[:multiplex] cmdline_cookie = params[:cmdline_cookie] env = Dict{String,String}(params[:env]) # machine could be of the format [user@]host[:port] bind_addr[:bind_port] # machine format string is split on whitespace machine_bind = split(machine) if isempty(machine_bind) throw(ArgumentError("invalid machine definition format string: \"$machine\$")) end if length(machine_bind) > 1 exeflags = `--bind-to $(machine_bind[2]) $exeflags` end if cmdline_cookie exeflags = `$exeflags --worker=$(cluster_cookie())` else exeflags = `$exeflags --worker` end host, portnum = parse_machine(machine_bind[1]) portopt = portnum === nothing ? `` : `-p $portnum` sshflags = `$(params[:sshflags]) $portopt` if tunnel # First it checks if ssh multiplexing has been already enabled and the master process is running. # If it's already running, later ssh sessions also use the same ssh multiplexing session even if # `multiplex` is not explicitly specified; otherwise the tunneling session launched later won't # go to background and hang. This is because of OpenSSH implementation. if success(`$ssh $sshflags -O check $host`) multiplex = true elseif multiplex # automatically create an SSH multiplexing session at the next SSH connection controlpath = "~/.ssh/julia-%r@%h:%p" sshflags = `$sshflags -o ControlMaster=auto -o ControlPath=$controlpath -o ControlPersist=no` end end # Build up the ssh command # pass on some environment variables by default for var in ["JULIA_WORKER_TIMEOUT"] if !haskey(env, var) && haskey(ENV, var) env[var] = ENV[var] end end # Julia process with passed in command line flag arguments if shell === :posix # ssh connects to a POSIX shell cmds = "exec $(shell_escape_posixly(exename)) $(shell_escape_posixly(exeflags))" # set environment variables for (var, val) in env occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) || throw(ArgumentError("invalid env key $var")) cmds = "export $(var)=$(shell_escape_posixly(val))\n$cmds" end # change working directory cmds = "cd -- $(shell_escape_posixly(dir))\n$cmds" # shell login (-l) with string command (-c) to launch julia process remotecmd = shell_escape_posixly(`sh -l -c $cmds`) elseif shell === :csh # ssh connects to (t)csh remotecmd = "exec $(shell_escape_csh(exename)) $(shell_escape_csh(exeflags))" # set environment variables for (var, val) in env occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) || throw(ArgumentError("invalid env key $var")) remotecmd = "setenv $(var) $(shell_escape_csh(val))\n$remotecmd" end # change working directory if dir !== nothing && dir != "" remotecmd = "cd $(shell_escape_csh(dir))\n$remotecmd" end elseif shell === :wincmd # ssh connects to Windows cmd.exe any(c -> c == '"', exename) && throw(ArgumentError("invalid exename")) remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)) # change working directory if dir !== nothing && dir != "" any(c -> c == '"', dir) && throw(ArgumentError("invalid dir")) remotecmd = "pushd \"$(dir)\" && $remotecmd" end # set environment variables for (var, val) in env occursin(r"^[a-zA-Z0-9_()[\]{}\$\\/#',;\.@!?*+-]+\z", var) || throw(ArgumentError("invalid env key $var")) remotecmd = "set $(var)=$(shell_escape_wincmd(val))&& $remotecmd" end else throw(ArgumentError("invalid shell")) end # remote launch with ssh with given ssh flags / host / port information # -T → disable pseudo-terminal allocation # -a → disable forwarding of auth agent connection # -x → disable X11 forwarding # -o ClearAllForwardings → option if forwarding connections and # forwarded connections are causing collisions cmd = `$ssh -T -a -x -o ClearAllForwardings=yes $sshflags $host $remotecmd` # launch the remote Julia process # detach launches the command in a new process group, allowing it to outlive # the initial julia process (Ctrl-C and teardown methods are handled through messages) # for the launched processes. io = open(detach(cmd), "r+") cmdline_cookie || write_cookie(io) wconfig = WorkerConfig() wconfig.io = io.out wconfig.host = host wconfig.tunnel = tunnel wconfig.multiplex = multiplex wconfig.sshflags = sshflags wconfig.exeflags = exeflags wconfig.exename = exename wconfig.count = cnt wconfig.max_parallel = params[:max_parallel] wconfig.enable_threaded_blas = params[:enable_threaded_blas] push!(launched, wconfig) notify(launch_ntfy) end function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol) id = Int(id) if op === :interrupt ospid = config.ospid if ospid !== nothing host = notnothing(config.host) sshflags = notnothing(config.sshflags) if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`) @error "Error sending a Ctrl-C to julia worker $id on $host" end else # This state can happen immediately after an addprocs @error "Worker $id cannot be presently interrupted." end end end let tunnel_port = 9201 global next_tunnel_port function next_tunnel_port() retval = tunnel_port if tunnel_port > 32000 tunnel_port = 9201 else tunnel_port += 1 end retval end end """ ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex) -> localport Establish an SSH tunnel to a remote worker. Return a port number `localport` such that `localhost:localport` connects to `host:port`. """ function ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex) port = Int(port) cnt = ntries = 100 # the connection is forwarded to `port` on the remote server over the local port `localport` while cnt > 0 localport = next_tunnel_port() if multiplex # It assumes that an ssh multiplexing session has been already started by the remote worker. cmd = `ssh $sshflags -O forward -L $localport:$bind_addr:$port $user@$host` else # if we cannot do port forwarding, fail immediately # the -f option backgrounds the ssh session # `sleep 60` command specifies that an allotted time of 60 seconds is allowed to start the # remote julia process and establish the network connections specified by the process topology. # If no connections are made within 60 seconds, ssh will exit and an error will be printed on the # process that launched the remote process. ssh = `ssh -T -a -x -o ExitOnForwardFailure=yes` cmd = detach(`$ssh -f $sshflags $user@$host -L $localport:$bind_addr:$port sleep 60`) end if success(cmd) return localport end cnt -= 1 end throw(ErrorException( string("unable to create SSH tunnel after ", ntries, " tries. No free port?"))) end # LocalManager struct LocalManager <: ClusterManager np::Int restrict::Bool # Restrict binding to 127.0.0.1 only end """ addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers Launch `np` workers on the local host using the in-built `LocalManager`. Local workers inherit the current package environment (i.e., active project, [`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process. **Keyword arguments**: - `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`. - `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref). !!! compat "Julia 1.9" The inheriting of the package environment and the `env` keyword argument were added in Julia 1.9. """ function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) manager = LocalManager(np, restrict) check_addprocs_args(manager, kwargs) addprocs(manager; kwargs...) end Base.show(io::IO, manager::LocalManager) = print(io, "LocalManager()") function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition) dir = params[:dir] exename = params[:exename] exeflags = params[:exeflags] bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)` env = Dict{String,String}(params[:env]) # TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function # together with load_path() etc. Might be useful to have when spawning julia # processes outside of Distributed.jl too. # JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup, # but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here. # Users can opt-out of this by passing `env = ...` to addprocs(...). pathsep = Sys.iswindows() ? ";" : ":" if get(env, "JULIA_LOAD_PATH", nothing) === nothing env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep) end if get(env, "JULIA_DEPOT_PATH", nothing) === nothing env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep) end # If we haven't explicitly asked for threaded BLAS, prevent OpenBLAS from starting # up with multiple threads, thereby sucking up a bunch of wasted memory on Windows. if !params[:enable_threaded_blas] && get(env, "OPENBLAS_NUM_THREADS", nothing) === nothing env["OPENBLAS_NUM_THREADS"] = "1" end # Set the active project on workers using JULIA_PROJECT. # Users can opt-out of this by (i) passing `env = ...` or (ii) passing # `--project=...` as `exeflags` to addprocs(...). project = Base.ACTIVE_PROJECT[] if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing env["JULIA_PROJECT"] = project end for i in 1:manager.np cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker` io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+") write_cookie(io) wconfig = WorkerConfig() wconfig.process = io wconfig.io = io.out wconfig.enable_threaded_blas = params[:enable_threaded_blas] push!(launched, wconfig) end notify(c) end function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) if op === :interrupt kill(config.process, 2) end end """ launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition) Implemented by cluster managers. For every Julia worker launched by this function, it should append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit once all workers, requested by `manager` have been launched. `params` is a dictionary of all keyword arguments [`addprocs`](@ref) was called with. """ launch """ manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol) Implemented by cluster managers. It is called on the master process, during a worker's lifetime, with appropriate `op` values: - with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool. - with `:interrupt` when `interrupt(workers)` is called. The `ClusterManager` should signal the appropriate worker with an interrupt signal. - with `:finalize` for cleanup purposes. """ manage # DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager struct DefaultClusterManager <: ClusterManager end const tunnel_hosts_map = Dict{String, Semaphore}() """ connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO) Implemented by cluster managers using custom transports. It should establish a logical connection to worker with id `pid`, specified by `config` and return a pair of `IO` objects. Messages from `pid` to current process will be read off `instrm`, while messages to be sent to `pid` will be written to `outstrm`. The custom transport implementation must ensure that messages are delivered and received completely and in order. `connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between workers. """ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) if config.connect_at !== nothing # this is a worker-to-worker setup call. return connect_w2w(pid, config) end # master connecting to workers if config.io !== nothing (bind_addr, port::Int) = read_worker_host_port(config.io) pubhost = something(config.host, bind_addr) config.host = pubhost config.port = port else pubhost = notnothing(config.host) port = notnothing(config.port) bind_addr = something(config.bind_addr, pubhost) end tunnel = something(config.tunnel, false) s = split(pubhost,'@') user = "" if length(s) > 1 user = s[1] pubhost = s[2] else if haskey(ENV, "USER") user = ENV["USER"] elseif tunnel error("USER must be specified either in the environment ", "or as part of the hostname when tunnel option is used") end end if tunnel if !haskey(tunnel_hosts_map, pubhost) tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int))) end sem = tunnel_hosts_map[pubhost] sshflags = notnothing(config.sshflags) multiplex = something(config.multiplex, false) acquire(sem) try (s, bind_addr, forward) = connect_to_worker_with_tunnel(pubhost, bind_addr, port, user, sshflags, multiplex) config.forward = forward finally release(sem) end else (s, bind_addr) = connect_to_worker(bind_addr, port) end config.bind_addr = bind_addr # write out a subset of the connect_at required for further worker-worker connection setups config.connect_at = (bind_addr, port) if config.io !== nothing let pid = pid redirect_worker_output(pid, notnothing(config.io)) end end (s, s) end function connect_w2w(pid::Int, config::WorkerConfig) (rhost, rport) = notnothing(config.connect_at)::Tuple{String, Int} config.host = rhost config.port = rport (s, bind_addr) = connect_to_worker(rhost, rport) (s,s) end const client_port = Ref{UInt16}(0) function socket_reuse_port(iptype) if ccall(:jl_has_so_reuseport, Int32, ()) == 1 sock = TCPSocket(delay = false) # Some systems (e.g. Linux) require the port to be bound before setting REUSEPORT bind_early = Sys.islinux() bind_early && bind_client_port(sock, iptype) rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), sock.handle) if rc < 0 close(sock) # This is an issue only on systems with lots of client connections, hence delay the warning nworkers() > 128 && @warn "Error trying to reuse client port number, falling back to regular socket" maxlog=1 # provide a clean new socket return TCPSocket() end bind_early || bind_client_port(sock, iptype) return sock else return TCPSocket() end end function bind_client_port(sock::TCPSocket, iptype) bind_host = iptype(0) if Sockets.bind(sock, bind_host, client_port[]) _addr, port = getsockname(sock) client_port[] = port end return sock end function connect_to_worker(host::AbstractString, port::Integer) # Avoid calling getaddrinfo if possible - involves a DNS lookup # host may be a stringified ipv4 / ipv6 address or a dns name bind_addr = nothing try bind_addr = parse(IPAddr,host) catch bind_addr = getaddrinfo(host) end iptype = typeof(bind_addr) sock = socket_reuse_port(iptype) connect(sock, bind_addr, UInt16(port)) (sock, string(bind_addr)) end function connect_to_worker_with_tunnel(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags, multiplex) localport = ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags, multiplex) s = connect("localhost", localport) forward = "$localport:$bind_addr:$port" (s, bind_addr, forward) end function cancel_ssh_tunnel(config::WorkerConfig) host = notnothing(config.host) sshflags = notnothing(config.sshflags) tunnel = something(config.tunnel, false) multiplex = something(config.multiplex, false) if tunnel && multiplex forward = notnothing(config.forward) run(`ssh $sshflags -O cancel -L $forward $host`) end end """ kill(manager::ClusterManager, pid::Int, config::WorkerConfig) Implemented by cluster managers. It is called on the master process, by [`rmprocs`](@ref). It should cause the remote worker specified by `pid` to exit. `kill(manager::ClusterManager.....)` executes a remote `exit()` on `pid`. """ function kill(manager::ClusterManager, pid::Int, config::WorkerConfig) remote_do(exit, pid) nothing end function kill(manager::SSHManager, pid::Int, config::WorkerConfig) remote_do(exit, pid) cancel_ssh_tunnel(config) nothing end function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15) # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) timer_task = @async begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal if !process_exited(config.process) @warn("Failed to gracefully kill worker $(pid), sending SIGTERM") kill(config.process, Base.SIGTERM) sleep(term_timeout) if !process_exited(config.process) @warn("Worker $(pid) ignored SIGTERM, sending SIGKILL") kill(config.process, Base.SIGKILL) end end end errormonitor(timer_task) return nothing end H/opt/julia-1.10.3/share/julia/stdlib/v1.10/Distributed/src/precompile.jl9precompile(Tuple{typeof(Distributed.remotecall),Function,Int,Module,Vararg{Any, 100}}) precompile(Tuple{typeof(Distributed.procs)}) precompile(Tuple{typeof(Distributed.finalize_ref), Distributed.Future}) # This is disabled because it doesn't give much benefit # and the code in Distributed is poorly typed causing many invalidations # TODO: Maybe reenable now that Distributed is not in sysimage. #= precompile_script *= """ using Distributed addprocs(2) pmap(x->iseven(x) ? 1 : 0, 1:4) @distributed (+) for i = 1:100 Int(rand(Bool)) end """ =# Nq.x8