diff --git a/src/DistributedNext.jl b/src/DistributedNext.jl index cbd16b5..45a8b1f 100644 --- a/src/DistributedNext.jl +++ b/src/DistributedNext.jl @@ -106,6 +106,17 @@ function _require_callback(mod::Base.PkgId) end end +# This is a minimal copy of Base.Lockable we use for backwards compatibility with 1.10 +struct Lockable{T, L <: Base.AbstractLock} + value::T + lock::L +end +Lockable(value) = Lockable(value, ReentrantLock()) +Base.getindex(l::Lockable) = (Base.assert_havelock(l.lock); l.value) +Base.lock(l::Lockable) = lock(l.lock) +Base.trylock(l::Lockable) = trylock(l.lock) +Base.unlock(l::Lockable) = unlock(l.lock) + const REF_ID = Threads.Atomic{Int}(1) next_ref_id() = Threads.atomic_add!(REF_ID, 1) diff --git a/src/cluster.jl b/src/cluster.jl index d3a903a..4f7c995 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -139,8 +139,8 @@ mutable struct Worker Worker(id::Int) = Worker(id, nothing) function Worker(id::Int, conn_func) @assert id > 0 - if haskey(map_pid_wrkr, id) - return map_pid_wrkr[id] + @lock map_pid_wrkr if haskey(map_pid_wrkr[], id) + return map_pid_wrkr[][id] end w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func) w.initialized = Event() @@ -407,7 +407,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # System is started in head node mode, cleanup related entries empty!(PGRP.workers) - empty!(map_pid_wrkr) + @lock map_pid_wrkr empty!(map_pid_wrkr[]) cluster_cookie(cookie) nothing @@ -793,7 +793,7 @@ function check_master_connect() errormonitor( Threads.@spawn begin timeout = worker_timeout() - if timedwait(() -> haskey(map_pid_wrkr, 1), timeout) === :timed_out + if timedwait(() -> @lock(map_pid_wrkr, haskey(map_pid_wrkr[], 1)), timeout) === :timed_out print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end @@ -826,24 +826,19 @@ function cluster_cookie(cookie) cookie end - -let next_pid = 2 # 1 is reserved for the client (always) - global get_next_pid - function get_next_pid() - retval = next_pid - next_pid += 1 - retval - end -end +# 1 is reserved for the client (always) +const next_pid = Threads.Atomic{Int}(2) +# Note that atomic_add!() returns the old value, which is what we want +get_next_pid() = Threads.atomic_add!(next_pid, 1) mutable struct ProcessGroup name::String - workers::Array{Any,1} + workers::Vector{Union{Worker, LocalProcess}} refs::Dict{RRID,Any} # global references topology::Symbol lazy::Union{Bool, Nothing} - ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, nothing) + ProcessGroup(w::Vector) = new("pg-default", w, Dict(), :all_to_all, nothing) end const PGRP = ProcessGroup([]) @@ -873,11 +868,11 @@ end # globals const LPROC = LocalProcess() const LPROCROLE = Ref{Symbol}(:master) -const HDR_VERSION_LEN=16 -const HDR_COOKIE_LEN=16 -const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() -const map_sock_wrkr = IdDict() -const map_del_wrkr = Set{Int}() +const HDR_VERSION_LEN = 16 +const HDR_COOKIE_LEN = 16 +const map_pid_wrkr = Lockable(Dict{Int, Union{Worker, LocalProcess}}()) +const map_sock_wrkr = Lockable(IdDict()) +const map_del_wrkr = Lockable(Set{Int}()) # whether process is a master or worker in a distributed setup myrole() = LPROCROLE[] @@ -1018,7 +1013,7 @@ See also [`other_procs()`](@ref). function procs(pid::Integer) if myid() == 1 all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)] - if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager)) + if (pid == 1) || (isa(@lock(map_pid_wrkr, map_pid_wrkr[][pid].manager), LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else ipatpid = get_bind_addr(pid) @@ -1124,8 +1119,8 @@ function _rmprocs(pids, waitfor) if p == 1 @warn "rmprocs: process 1 not removed" else - if haskey(map_pid_wrkr, p) - w = map_pid_wrkr[p] + w = @lock map_pid_wrkr get(map_pid_wrkr[], p, nothing) + if !isnothing(w) set_worker_state(w, W_TERMINATING) kill(w.manager, p, w.config) push!(rmprocset, w) @@ -1165,16 +1160,17 @@ ProcessExitedException() = ProcessExitedException(-1) worker_from_id(i) = worker_from_id(PGRP, i) function worker_from_id(pg::ProcessGroup, i) - if !isempty(map_del_wrkr) && in(i, map_del_wrkr) + @lock map_del_wrkr if !isempty(map_del_wrkr[]) && in(i, map_del_wrkr[]) throw(ProcessExitedException(i)) end - w = get(map_pid_wrkr, i, nothing) + + w = @lock map_pid_wrkr get(map_pid_wrkr[], i, nothing) if w === nothing if myid() == 1 error("no process with id $i exists") end w = Worker(i) - map_pid_wrkr[i] = w + @lock map_pid_wrkr map_pid_wrkr[][i] = w else w = w::Union{Worker, LocalProcess} end @@ -1190,7 +1186,7 @@ This is useful when writing custom [`serialize`](@ref) methods for a type, which optimizes the data written out depending on the receiving process id. """ function worker_id_from_socket(s) - w = get(map_sock_wrkr, s, nothing) + w = @lock map_sock_wrkr get(map_sock_wrkr[], s, nothing) if isa(w,Worker) if s === w.r_stream || s === w.w_stream return w.id @@ -1207,23 +1203,28 @@ end register_worker(w) = register_worker(PGRP, w) function register_worker(pg, w) push!(pg.workers, w) - map_pid_wrkr[w.id] = w + @lock map_pid_wrkr map_pid_wrkr[][w.id] = w end function register_worker_streams(w) - map_sock_wrkr[w.r_stream] = w - map_sock_wrkr[w.w_stream] = w + @lock map_sock_wrkr begin + map_sock_wrkr[][w.r_stream] = w + map_sock_wrkr[][w.w_stream] = w + end end deregister_worker(pid) = deregister_worker(PGRP, pid) function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) - w = pop!(map_pid_wrkr, pid, nothing) + + w = @lock map_pid_wrkr pop!(map_pid_wrkr[], pid, nothing) if isa(w, Worker) if isdefined(w, :r_stream) - pop!(map_sock_wrkr, w.r_stream, nothing) - if w.r_stream != w.w_stream - pop!(map_sock_wrkr, w.w_stream, nothing) + @lock map_sock_wrkr begin + pop!(map_sock_wrkr[], w.r_stream, nothing) + if w.r_stream != w.w_stream + pop!(map_sock_wrkr[], w.w_stream, nothing) + end end end @@ -1240,7 +1241,7 @@ function deregister_worker(pg, pid) end end end - push!(map_del_wrkr, pid) + @lock map_del_wrkr push!(map_del_wrkr[], pid) # delete this worker from our remote reference client sets ids = [] @@ -1270,7 +1271,7 @@ end function interrupt(pid::Integer) @assert myid() == 1 - w = map_pid_wrkr[pid] + w = @lock map_pid_wrkr map_pid_wrkr[][pid] if isa(w, Worker) manage(w.manager, w.id, w.config, :interrupt) end @@ -1310,11 +1311,11 @@ function check_same_host(pids) # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake - if all(p -> (p==1) || (isa(map_pid_wrkr[p].manager, LocalManager)), pids) + @lock map_pid_wrkr if all(p -> (p==1) || (isa(map_pid_wrkr[][p].manager, LocalManager)), pids) return true else - first_bind_addr = notnothing(wp_bind_addr(map_pid_wrkr[pids[1]])) - return all(p -> notnothing(wp_bind_addr(map_pid_wrkr[p])) == first_bind_addr, pids[2:end]) + first_bind_addr = notnothing(wp_bind_addr(map_pid_wrkr[][pids[1]])) + return all(p -> notnothing(wp_bind_addr(map_pid_wrkr[][p])) == first_bind_addr, pids[2:end]) end end end diff --git a/src/macros.jl b/src/macros.jl index ea98c60..b227e12 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -1,11 +1,9 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -let nextidx = Threads.Atomic{Int}(0) - global nextproc - function nextproc() - idx = Threads.atomic_add!(nextidx, 1) - return workers()[(idx % nworkers()) + 1] - end +const nextidx = Threads.Atomic{Int}(0) +function nextproc() + idx = Threads.atomic_add!(nextidx, 1) + return workers()[(idx % nworkers()) + 1] end spawnat(p, thunk) = remotecall(thunk, p) diff --git a/src/managers.jl b/src/managers.jl index 2ce1a3b..1da53c0 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -414,18 +414,9 @@ function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symb end end -let tunnel_port = 9201 - global next_tunnel_port - function next_tunnel_port() - retval = tunnel_port - if tunnel_port > 32000 - tunnel_port = 9201 - else - tunnel_port += 1 - end - retval - end -end +const tunnel_counter = Threads.Atomic{Int}(1) +# This is defined such that the port numbers start at 9201 and wrap around at 32,000 +next_tunnel_port() = (Threads.atomic_add!(tunnel_counter, 1) % 22_800) + 9200 """ diff --git a/src/process_messages.jl b/src/process_messages.jl index 211c225..d6fdbb1 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -220,7 +220,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) if wpid < 1 println(stderr, e, CapturedException(e, catch_backtrace())) println(stderr, "Process($(myid())) - Unknown remote, closing connection.") - elseif !(wpid in map_del_wrkr) + elseif @lock(map_del_wrkr, !(wpid in map_del_wrkr[])) werr = worker_from_id(wpid) oldstate = @atomic werr.state set_worker_state(werr, W_TERMINATED) @@ -325,7 +325,7 @@ function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) end function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version) - w = map_sock_wrkr[r_stream] + w = @lock map_sock_wrkr map_sock_wrkr[][r_stream] w.version = version end @@ -378,7 +378,7 @@ function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConf end function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) - w = map_sock_wrkr[r_stream] + w = @lock map_sock_wrkr map_sock_wrkr[][r_stream] environ = something(w.config.environ, Dict()) environ[:cpu_threads] = msg.cpu_threads w.config.environ = environ