jli  Linuxx86_641.10.3v1.10.30b4590a5507d3f3046e5bafc007cacbbfc9b310bqMaltMB16;N2虨7*/opt/julia/packages/Malt/Z3YQq/src/Malt.jlYAk1 [)*Serializationސݗ1V$ bdSockets#8YsJ DRelocatableFolders,/opt/julia/packages/Malt/Z3YQq/src/shared.jlYA=/opt/julia/packages/Malt/Z3YQq/src/DistributedStdlibWorker.jlYA pWo[\( Distributed=/opt/julia/packages/Malt/Z3YQq/src/DistributedStdlibWorker.jlYA*/opt/julia/packages/Malt/Z3YQq/src/Malt.jlYA,/opt/julia/packages/Malt/Z3YQq/src/shared.jlYA,/opt/julia/packages/Malt/Z3YQq/src/worker.jlYA4 CoremуJ5Basemу]J5MainmуJ5ArgToolsBń x(mуF K5 Artifactsmr-V3|mу K5Base64UlD*_mу> K5CRC32c\y.jmуj K5 FileWatchingXzsy`{,zmуh& K5LibdluVW59˗,mу-" K5LoggingT{VhUXM=mуrU" K5MmapP~:xg,Omу|' K5NetworkOptionsC0YW,mуʠ, K5SHAQ<$!<%mу1 K5 Serialization [)*k1mу-G K5Sockets1V$ bdސݗmуYBY K5UnicodeP>I>Nrmуeszo K5 LinearAlgebraSm7̏mуuux K5 OpenBLAS_jll[(Śb6EcQ FmуDux K5libblastrampoline_jllLSۆ }lxӠmу^} K5MarkdownZPn7z`smу/Ed~ K5Printfg^cX׸QDmу;h K5Random_ɢ?\Ymу? K5TarOi>աmу!t, K5DatesEY8pj2 mуX K5FuturebS;3{I xVMmуsD K5InteractiveUtilsWL ~@'ZmуVg K5LibGit2Z[&RPTv3EКRmу8J K5 LibGit2_jll YXg}]$mуD K5 MbedTLS_jllAX 3ȡ_mу- K5 LibSSH2_jlloTZk)߆ nothing)) new(mod, task, true) end end Base.summary(io::IO, w::InProcessWorker) = write(io, "Malt.InProcessWorker in module $(w.host_module)") const __iNtErNaL_running_procs = Set{Base.Process}() __iNtErNaL_get_running_procs() = filter!(Base.process_running, __iNtErNaL_running_procs) """ Malt.Worker() Create a new `Worker`. A `Worker` struct is a handle to a (separate) Julia process. # Examples ```julia-repl julia> w = Malt.worker() Malt.Worker(0x0000, Process(`…`, ProcessRunning)) ``` """ mutable struct Worker <: AbstractWorker port::UInt16 proc::Base.Process proc_pid::Int32 current_socket::Sockets.TCPSocket # socket_lock::ReentrantLock current_message_id::MsgID expected_replies::Dict{MsgID,Channel{WorkerResult}} function Worker(; env=String[], exeflags=[]) # Spawn process cmd = _get_worker_cmd(; env, exeflags) proc = open(Cmd( cmd; detach=true, windows_hide=true, ), "w+") # Keep internal list __iNtErNaL_get_running_procs() push!(__iNtErNaL_running_procs, proc) # Block until reading the port number of the process (from its stdout) port_str = readline(proc) port = parse(UInt16, port_str) # Connect socket = Sockets.connect(port) _buffer_writes(socket) # There's no reason to keep the worker process alive after the manager loses its handle. w = finalizer(w -> @async(stop(w)), new( port, proc, getpid(proc), socket, MsgID(0), Dict{MsgID,Channel{WorkerResult}}(), ) ) atexit(() -> stop(w)) _exit_loop(w) _receive_loop(w) return w end end Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with PID $(w.proc_pid)") function _exit_loop(worker::Worker) @async for _i in Iterators.countfrom(1) try if !isrunning(worker) # the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply. for c in values(worker.expected_replies) isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing)) end break end sleep(1) catch e @error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) end end end function _receive_loop(worker::Worker) io = worker.current_socket # Here we use: # `for _i in Iterators.countfrom(1)` # instead of # `while true` # as a workaround for https://github.com/JuliaLang/julia/issues/37154 @async for _i in Iterators.countfrom(1) try if !isopen(io) @debug("HOST: io closed.") break end @debug "HOST: Waiting for message" msg_type = try if eof(io) @debug("HOST: io closed.") break end read(io, UInt8) catch e if e isa InterruptException @debug("HOST: Caught interrupt while waiting for incoming data, rethrowing to REPL...") _rethrow_to_repl(e; rethrow_regular=false) continue # and go back to waiting for incoming data else @debug("HOST: Caught exception while waiting for incoming data, breaking", exception = (e, backtrace())) break end end # this next line can't fail msg_id = read(io, MsgID) msg_data, success = try deserialize(io), true catch err err, false finally _discard_until_boundary(io) end if !success msg_type = MsgType.special_serialization_failure end # msg_type will be one of: # MsgType.from_worker_call_result # MsgType.from_worker_call_failure # MsgType.special_serialization_failure c = get(worker.expected_replies, msg_id, nothing) if c isa Channel{WorkerResult} put!(c, WorkerResult(msg_type, msg_data)) else @error "HOST: Received a response, but I didn't ask for anything" msg_type msg_id msg_data end @debug("HOST: Received message", msg_data) catch e if e isa InterruptException @debug "HOST: Interrupted during receive loop." _rethrow_to_repl(e) elseif e isa Base.IOError && !isopen(io) sleep(3) if isrunning(worker) @error "HOST: Connection lost with worker, but the process is still running. Killing process..." exception = (e, catch_backtrace()) kill(worker, Base.SIGKILL) else # This is a clean exit end break else @error "HOST: Unknown error" exception = (e, catch_backtrace()) isopen(io) break end end end end # The entire `src` dir should be relocatable, so that worker.jl can include("MsgType.jl"). const src_path = RelocatableFolders.@path @__DIR__ function _get_worker_cmd(exe=Base.julia_cmd()[1]; env, exeflags) return addenv(`$exe --startup-file=no $exeflags $(joinpath(src_path, "worker.jl"))`, String["OPENBLAS_NUM_THREADS=1", Base.byteenv(env)...]) end ## We use tuples instead of structs for messaging so the worker doesn't need to load additional modules. _new_call_msg(send_result::Bool, f::Function, args, kwargs) = ( f, args, kwargs, !send_result, ) _new_do_msg(f::Function, args, kwargs) = ( f, args, kwargs, true, ) # function _ensure_connected(w::Worker) # # TODO: check if process running? # # TODO: `while` instead of `if`? # if w.current_socket === nothing || !isopen(w.current_socket) # w.current_socket = connect(w.port) # @async _receive_loop(w) # end # return w # end # GENERIC COMMUNICATION PROTOCOL """ Low-level: send a message to a worker. Returns a `msg_id::UInt16`, which can be used to wait for a response with `_wait_for_response`. """ function _send_msg(worker::Worker, msg_type::UInt8, msg_data, expect_reply::Bool=true)::MsgID _assert_is_running(worker) # _ensure_connected(worker) msg_id = (worker.current_message_id += MsgID(1))::MsgID if expect_reply worker.expected_replies[msg_id] = Channel{WorkerResult}(1) end @debug("HOST: sending message", msg_data) _serialize_msg(worker.current_socket, msg_type, msg_id, msg_data) return msg_id end """ Low-level: wait for a response to a previously sent message. Returns the response. Blocking call. """ function _wait_for_response(worker::Worker, msg_id::MsgID) if haskey(worker.expected_replies, msg_id) c = worker.expected_replies[msg_id] @debug("HOST: waiting for response of", msg_id) response = take!(c) delete!(worker.expected_replies, msg_id) return unwrap_worker_result(worker, response) else error("HOST: No response expected for message id $msg_id") end end """ `_wait_for_response ∘ _send_msg` """ function _send_receive(w::Worker, msg_type::UInt8, msg_data) msg_id = _send_msg(w, msg_type, msg_data, true) return _wait_for_response(w, msg_id) end """ `@async(_wait_for_response) ∘ _send_msg` """ function _send_receive_async(w::Worker, msg_type::UInt8, msg_data, output_transformation=identity)::Task # TODO: Unwrap TaskFailedExceptions msg_id = _send_msg(w, msg_type, msg_data, true) return @async output_transformation(_wait_for_response(w, msg_id)) end """ Malt.remote_call(f, w::Worker, args...; kwargs...) Evaluate `f(args...; kwargs...)` in worker `w` asynchronously. Returns a task that acts as a promise; the result value of the task is the result of the computation. The function `f` must already be defined in the namespace of `w`. # Examples ```julia-repl julia> promise = Malt.remote_call(uppercase ∘ *, w, "I ", "declare ", "bankruptcy!"); julia> fetch(promise) "I DECLARE BANKRUPTCY!" ``` """ function remote_call(f, w::Worker, args...; kwargs...) _send_receive_async( w, MsgType.from_host_call_with_response, _new_call_msg(true, f, args, kwargs), ) end function remote_call(f, w::InProcessWorker, args...; kwargs...) w.latest_request_task = @async remote_call_fetch(f, w, args...; kwargs...) end function remote_call_fetch(f, w::InProcessWorker, args...; kwargs...) try f(args...; kwargs...) catch ex throw(RemoteException( w, sprint() do io Base.invokelatest(showerror, io, ex, catch_backtrace()) end )) end end function remote_call_wait(f, w::InProcessWorker, args...; kwargs...) remote_call_fetch(f, w, args...; kwargs...) nothing end """ Malt.remote_call_fetch(f, w::Worker, args...; kwargs...) Shorthand for `fetch(Malt.remote_call(…))`. Blocks and then returns the result of the remote call. """ function remote_call_fetch(f, w::AbstractWorker, args...; kwargs...) fetch(remote_call(f, w, args...; kwargs...)) end function remote_call_fetch(f, w::Worker, args...; kwargs...) _send_receive( w, MsgType.from_host_call_with_response, _new_call_msg(true, f, args, kwargs) ) end """ Malt.remote_call_wait(f, w::Worker, args...; kwargs...) Shorthand for `wait(Malt.remote_call(…))`. Blocks and discards the resulting value. """ function remote_call_wait(f, w::AbstractWorker, args...; kwargs...) wait(remote_call(f, w, args...; kwargs...)) end function remote_call_wait(f, w::Worker, args...; kwargs...) _send_receive( w, MsgType.from_host_call_with_response, _new_call_msg(false, f, args, kwargs) ) end """ Malt.remote_do(f, w::Worker, args...; kwargs...) Start evaluating `f(args...; kwargs...)` in worker `w` asynchronously, and return `nothing`. Unlike `remote_call`, no reference to the remote call is available. This means: - You cannot wait for the call to complete on the worker. - The value returned by `f` is not available. """ function remote_do(f, w::Worker, args...; kwargs...) _send_msg( w, MsgType.from_host_call_without_response, _new_do_msg(f, args, kwargs), false ) nothing end function remote_do(f, ::InProcessWorker, args...; kwargs...) @async f(args...; kwargs...) nothing end ## Eval variants """ Malt.remote_eval(mod::Module=Main, w::Worker, expr) Evaluate expression `expr` under module `mod` on the worker `w`. `Malt.remote_eval` is asynchronous, like `Malt.remote_call`. The module `m` and the type of the result of `expr` must be defined in both the main process and the worker. # Examples ```julia-repl julia> Malt.remote_eval(w, quote x = "x is a global variable" end) julia> Malt.remote_eval_fetch(w, :x) "x is a global variable" ``` """ remote_eval(mod::Module, w::AbstractWorker, expr) = remote_call(Core.eval, w, mod, expr) remote_eval(w::AbstractWorker, expr) = remote_eval(Main, w, expr) """ Shorthand for `fetch(Malt.remote_eval(…))`. Blocks and returns the resulting value. """ remote_eval_fetch(mod::Module, w::AbstractWorker, expr) = remote_call_fetch(Core.eval, w, mod, expr) remote_eval_fetch(w::AbstractWorker, expr) = remote_eval_fetch(Main, w, expr) """ Shorthand for `wait(Malt.remote_eval(…))`. Blocks and discards the resulting value. """ remote_eval_wait(mod::Module, w::AbstractWorker, expr) = remote_call_wait(Core.eval, w, mod, expr) remote_eval_wait(w::AbstractWorker, expr) = remote_eval_wait(Main, w, expr) """ Malt.worker_channel(w::AbstractWorker, expr) Create a channel to communicate with worker `w`. `expr` must be an expression that evaluates to an `AbstractChannel`. `expr` should assign the channel to a (global) variable so the worker has a handle that can be used to send messages back to the manager. """ function worker_channel(w::Worker, expr) RemoteChannel(w, expr) end function worker_channel(w::InProcessWorker, expr) remote_call_fetch(w) do result = Core.eval(w.host_module, expr) @assert result isa AbstractChannel result end end struct RemoteChannel{T} <: AbstractChannel{T} worker::Worker id::UInt64 function RemoteChannel{T}(worker::Worker, expr) where {T} id = (worker.current_message_id += MsgID(1))::MsgID remote_eval_wait(Main, worker, quote Main._channel_cache[$id] = $expr end) new{T}(worker, id) end RemoteChannel(w::Worker, expr) = RemoteChannel{Any}(w, expr) end Base.take!(rc::RemoteChannel) = remote_eval_fetch(Main, rc.worker, :(take!(Main._channel_cache[$(rc.id)])))::eltype(rc) Base.put!(rc::RemoteChannel, v) = remote_eval_wait(Main, rc.worker, :(put!(Main._channel_cache[$(rc.id)], $v))) Base.isready(rc::RemoteChannel) = remote_eval_fetch(Main, rc.worker, :(isready(Main._channel_cache[$(rc.id)])))::Bool Base.wait(rc::RemoteChannel) = remote_eval_wait(Main, rc.worker, :(wait(Main._channel_cache[$(rc.id)])))::Bool ## Signals & Termination """ Malt.isrunning(w::Worker)::Bool Check whether the worker process `w` is running. """ isrunning(w::Worker)::Bool = Base.process_running(w.proc) isrunning(w::InProcessWorker) = w.running _assert_is_running(w::Worker) = isrunning(w) || throw(TerminatedWorkerException()) """ Malt.stop(w::Worker; exit_timeout::Real=15.0, term_timeout::Real=15.0)::Bool Terminate the worker process `w` in the nicest possible way. We first try using `Base.exit`, then SIGTERM, then SIGKILL. Waits for the worker process to be terminated. If `w` is still alive, and now terinated, `stop` returns true. If `w` is already dead, `stop` returns `false`. If `w` failed to terminate, throw an exception. """ function stop(w::Worker; exit_timeout::Real=15.0, term_timeout::Real=15.0) ir = () -> !isrunning(w) if isrunning(w) remote_do(Base.exit, w) if !_poll(ir; timeout_s=exit_timeout) kill(w, Base.SIGTERM) if !_poll(ir; timeout_s=term_timeout) kill(w, Base.SIGKILL) _wait_for_exit(w) end end true else false end end function stop(w::InProcessWorker) w.running = false true end """ kill(w::Malt.Worker, signum=Base.SIGTERM) Terminate the worker process `w` forcefully by sending a `SIGTERM` signal (unless otherwise specified). This is not the recommended way to terminate the process. See `Malt.stop`. """ # https://youtu.be/dyIilW_eBjc Base.kill(w::Worker, signum=Base.SIGTERM) = Base.kill(w.proc, signum) Base.kill(::InProcessWorker, signum=Base.SIGTERM) = nothing function _poll(f::Function; interval::Real=0.01, timeout_s::Real=Inf64) tstart = time() while true f() && return true if time() - tstart >= timeout_s return false end sleep(interval) end end _wait_for_exit(::AbstractWorker; timeout_s::Real=20.0) = nothing function _wait_for_exit(w::Worker; timeout_s::Real=20.0) if !_poll(() -> !isrunning(w); timeout_s) error("HOST: Worker did not exit after $timeout_s seconds") end end """ Malt.interrupt(w::Worker) Send an interrupt signal to the worker process. This will interrupt the latest request (`remote_call*` or `remote_eval*`) that was sent to the worker. """ function interrupt(w::Worker) if !isrunning(w) @warn "Tried to interrupt a worker that has already shut down." summary(w) else if Sys.iswindows() ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) else Base.kill(w.proc, Base.SIGINT) end end nothing end function interrupt(w::InProcessWorker) istaskdone(w.latest_request_task) || schedule(w.latest_request_task, InterruptException(); error=true) nothing end # Based on `Base.task_done_hook` function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false) if isdefined(Base, :active_repl_backend) && isdefined(Base.active_repl_backend, :backend_task) && isdefined(Base.active_repl_backend, :in_eval) && Base.active_repl_backend.backend_task.state === :runnable && (isdefined(Base, :Workqueue) || isempty(Base.Workqueue)) && Base.active_repl_backend.in_eval @debug "HOST: Rethrowing interrupt to REPL" @async Base.schedule(Base.active_repl_backend.backend_task, e; error=true) elseif rethrow_regular @debug "HOST: Don't know what to do with this interrupt, rethrowing" exception = (e, catch_backtrace()) rethrow(e) end end end # module ,/opt/julia/packages/Malt/Z3YQq/src/shared.jlX const MsgType = ( from_host_call_with_response = UInt8(1), from_host_call_without_response = UInt8(2), from_host_fake_interrupt = UInt8(20), #### from_worker_call_result = UInt8(80), from_worker_call_failure = UInt8(81), ### special_serialization_failure = UInt8(100), special_worker_terminated = UInt8(101), ) const MsgID = UInt64 const BUFFER_SIZE = 65536 # Base.SZ_UNBUFFERED_IO # Future-compat version of Base.buffer_writes _buffer_writes(io) = @static if isdefined(Base, :buffer_writes) && hasmethod(Base.buffer_writes, (Base.LibuvStream, Int)) Base.buffer_writes(io, BUFFER_SIZE) end # from Distributed.jl: # # Boundary inserted between messages on the wire, used for recovering # from deserialization errors. Picked arbitrarily. # A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision # with message contents is negligible. const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d] function _discard_until_boundary(io::IO) readuntil(io, MSG_BOUNDARY) end function _serialize_msg(io::IO, msg_type::UInt8, msg_id::MsgID, msg_data::Any) lock(io) try write(io, msg_type) write(io, msg_id) serialize(io, msg_data) write(io, MSG_BOUNDARY) flush(io) finally unlock(io) end return nothing end=/opt/julia/packages/Malt/Z3YQq/src/DistributedStdlibWorker.jl import Distributed const Distributed_expr = quote Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] end """ Malt.DistributedStdlibWorker() This implements the same functions as `Malt.Worker` but it uses the Distributed stdlib as a backend. Can be used for backwards compatibility. """ mutable struct DistributedStdlibWorker <: AbstractWorker pid::Int64 isrunning::Bool function DistributedStdlibWorker(; env=String[], exeflags=[]) # Spawn process expr = if VERSION < v"1.9.0-aaa" isempty(env) || @warn "Malt.DistributedStdlibWorker: the `env` kwarg requires Julia 1.9" :($(Distributed_expr).addprocs(1; exeflags=$(exeflags)) |> first) else :($(Distributed_expr).addprocs(1; exeflags=$(exeflags), env=$(env)) |> first) end pid = Distributed.remotecall_eval(Main, 1, expr) # TODO: process preamble from Pluto? # There's no reason to keep the worker process alive after the manager loses its handle. w = finalizer(w -> @async(stop(w)), new(pid, true) ) atexit(() -> stop(w)) return w end end Base.summary(io::IO, w::DistributedStdlibWorker) = write(io, "Malt.DistributedStdlibWorker with pid $(w.pid)") macro transform_exception(worker, ex) :(try $(esc(ex)) catch e if e isa Distributed.RemoteException throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured))) else rethrow(e) end end) end function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...) @async Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) end function remote_call_fetch(f, w::DistributedStdlibWorker, args...; kwargs...) @transform_exception w Distributed.remotecall_fetch(f, w.pid, args...; kwargs...) end function remote_call_wait(f, w::DistributedStdlibWorker, args...; kwargs...) @transform_exception w Distributed.remotecall_wait(f, w.pid, args...; kwargs...) nothing end function remote_do(f, w::DistributedStdlibWorker, args...; kwargs...) Distributed.remote_do(f, w.pid, args...; kwargs...) end function worker_channel(w::DistributedStdlibWorker, expr) @transform_exception w Core.eval(Main, quote $(Distributed).RemoteChannel(() -> Core.eval(Main, $(QuoteNode(expr))), $(w.pid)) end) end isrunning(w::DistributedStdlibWorker) = w.isrunning function stop(w::DistributedStdlibWorker) w.isrunning = false Distributed.remotecall_eval(Main, 1, quote $(Distributed_expr).rmprocs($(w.pid)) |> wait end) nothing end Base.kill(w::DistributedStdlibWorker, signum=Base.SIGTERM) = error("not implemented for DistributedStdlibWorker") function interrupt(w::DistributedStdlibWorker) if Sys.iswindows() @warn "Malt.interrupt is not supported on Windows for a DistributedStdlibWorker" nothing else Distributed.interrupt(w.pid) end end 8%M