Skip to content

Commit e71cd09

Browse files
committed
Add support for worker statuses
1 parent 3ce41da commit e71cd09

4 files changed

Lines changed: 97 additions & 10 deletions

File tree

docs/src/_changelog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ This documents notable changes in DistributedNext.jl. The format is based on
1212
### Added
1313
- Implemented callback support for workers being added/removed etc ([#17]).
1414
- Added a package extension to support Revise.jl ([#17]).
15+
- Added support for setting worker statuses with [`setstatus`](@ref) and
16+
[`getstatus`](@ref) ([#17]).
1517

1618
## [v1.1.1] - 2026-03-09
1719

docs/src/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ DistributedNext.rmprocs
1414
DistributedNext.interrupt
1515
DistributedNext.myid
1616
DistributedNext.pmap
17+
DistributedNext.getstatus
18+
DistributedNext.setstatus!
1719
DistributedNext.RemoteException
1820
DistributedNext.ProcessExitedException
1921
DistributedNext.Future

src/cluster.jl

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,8 @@ const LPROC = LocalProcess()
893893
const LPROCROLE = Ref{Symbol}(:master)
894894
const HDR_VERSION_LEN=16
895895
const HDR_COOKIE_LEN=16
896+
const map_pid_statuses = Dict{Int, Any}()
897+
const map_pid_statuses_lock = ReentrantLock()
896898
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
897899
const map_sock_wrkr = IdDict()
898900
const map_del_wrkr = Set{Int}()
@@ -1035,15 +1037,16 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
10351037
segfaulting etc). Chooses and returns a unique key for the callback if `key` is
10361038
not specified.
10371039
1038-
The callback will be called with the worker ID and the final
1039-
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an
1040+
The callback will be called with the worker ID, the final
1041+
`Distributed.WorkerState` of the worker, and the last status of the worker as
1042+
set by [`setstatus!`](@ref), e.g. `f(w::Int, state, status)`. `state` is an
10401043
enum, a value of `WorkerState_terminated` means a graceful exit and a value of
10411044
`WorkerState_exterminated` means the worker died unexpectedly.
10421045
10431046
If the callback throws an exception it will be caught and printed.
10441047
"""
10451048
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks;
1046-
arg_types=Tuple{Int, WorkerState})
1049+
arg_types=Tuple{Int, WorkerState, Any})
10471050

10481051
"""
10491052
remove_worker_exited_callback(key)
@@ -1231,6 +1234,59 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out.
12311234
"""
12321235
other_workers() = filter(!=(myid()), workers())
12331236

1237+
"""
1238+
setstatus!(x, pid::Int=myid())
1239+
1240+
Set the status for worker `pid` to `x`. `x` may be any serializable object but
1241+
it's recommended to keep it small enough to cheaply send over a network. The
1242+
status will be passed to the worker-exited callbacks (see
1243+
[`add_worker_exited_callback`](@ref)) when the worker exits.
1244+
1245+
This can be handy if you want a way to know what a worker is doing at any given
1246+
time, or (in combination with a worker-exited callback) for knowing what a
1247+
worker was last doing before it died.
1248+
1249+
# Examples
1250+
```julia-repl
1251+
julia> DistributedNext.setstatus!("working on dataset 42")
1252+
"working on dataset 42"
1253+
1254+
julia> DistributedNext.getstatus()
1255+
"working on dataset 42"
1256+
```
1257+
"""
1258+
function setstatus!(x, pid::Int=myid())
1259+
if pid procs()
1260+
throw(ArgumentError("Worker $(pid) does not exist, cannot set its status"))
1261+
end
1262+
1263+
if myid() == 1
1264+
@lock map_pid_statuses_lock map_pid_statuses[pid] = x
1265+
else
1266+
remotecall_fetch(setstatus!, 1, x, myid())
1267+
end
1268+
end
1269+
1270+
_getstatus(pid) = @lock map_pid_statuses_lock get!(map_pid_statuses, pid, nothing)
1271+
1272+
"""
1273+
getstatus(pid::Int=myid())
1274+
1275+
Get the status for worker `pid`. If one was never explicitly set with
1276+
[`setstatus!`](@ref) this will return `nothing`.
1277+
"""
1278+
function getstatus(pid::Int=myid())
1279+
if pid procs()
1280+
throw(ArgumentError("Worker $(pid) does not exist, cannot get its status"))
1281+
end
1282+
1283+
if myid() == 1
1284+
_getstatus(pid)
1285+
else
1286+
remotecall_fetch(getstatus, 1, pid)
1287+
end
1288+
end
1289+
12341290
function cluster_mgmt_from_master_check()
12351291
if myid() != 1
12361292
throw(ErrorException("Only process 1 can add and remove workers"))
@@ -1450,15 +1506,20 @@ function deregister_worker(pg, pid)
14501506
end
14511507
end
14521508

1453-
# Call callbacks on the master
14541509
if myid() == 1
1510+
status = _getstatus(pid)
1511+
1512+
# Call callbacks on the master
14551513
for (name, callback) in worker_exited_callbacks
14561514
try
1457-
callback(pid, w.state)
1515+
callback(pid, w.state, status)
14581516
catch ex
14591517
@error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace())
14601518
end
14611519
end
1520+
1521+
# Delete its status
1522+
@lock map_pid_statuses_lock delete!(map_pid_statuses, pid)
14621523
end
14631524

14641525
return

test/distributed_exec.jl

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import Revise
44
using DistributedNext, Random, Serialization, Sockets
55
import DistributedNext
6-
import DistributedNext: launch, manage
6+
import DistributedNext: launch, manage, getstatus, setstatus!
77

88

99
@test cluster_cookie() isa String
@@ -1964,6 +1964,24 @@ include("splitrange.jl")
19641964
end
19651965
end
19661966

1967+
@testset "Worker statuses" begin
1968+
rmprocs(other_workers())
1969+
1970+
# Test with the local worker
1971+
@test isnothing(getstatus())
1972+
setstatus!("foo")
1973+
@test getstatus() == "foo"
1974+
@test_throws ArgumentError getstatus(2)
1975+
1976+
# Test with a remote worker
1977+
pid = only(addprocs(1))
1978+
@test isnothing(getstatus(pid))
1979+
remotecall_wait(setstatus!, pid, "bar", pid)
1980+
@test remotecall_fetch(getstatus, pid) == "bar"
1981+
1982+
rmprocs(pid)
1983+
end
1984+
19671985
@testset "Worker state callbacks" begin
19681986
rmprocs(other_workers())
19691987

@@ -1978,7 +1996,7 @@ end
19781996
starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager))
19791997
started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo")))
19801998
exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid))
1981-
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state)))
1999+
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> push!(exited_workers, (pid, state, status)))
19822000

19832001
# Test that the worker-started exception bubbles up
19842002
@test_throws TaskFailedException addprocs(1)
@@ -1988,7 +2006,7 @@ end
19882006
@test started_workers == [pid]
19892007
rmprocs(workers())
19902008
@test exiting_workers == [pid]
1991-
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated)]
2009+
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated, nothing)]
19922010

19932011
# Trying to reset an existing callback should fail
19942012
@test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key)
@@ -2021,16 +2039,20 @@ end
20212039
@test length(exiting_workers) == 1
20222040
@test length(exited_workers) == 1
20232041

2024-
# Test that workers that were killed forcefully are detected as such
2042+
# Test that workers that were killed forcefully are detected as such, and
2043+
# that statuses are passed properly.
20252044
exit_state = nothing
2026-
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state)
2045+
last_status = nothing
2046+
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> (exit_state = state; last_status = status))
20272047
pid = only(addprocs(1))
2048+
setstatus!("foo", pid)
20282049

20292050
redirect_stderr(devnull) do
20302051
remote_do(exit, pid)
20312052
timedwait(() -> !isnothing(exit_state), 10)
20322053
end
20332054
@test exit_state == DistributedNext.WorkerState_exterminated
2055+
@test last_status == "foo"
20342056
DistributedNext.remove_worker_exited_callback(exited_key)
20352057
end
20362058

0 commit comments

Comments
 (0)