Skip to content

Commit e3f24bb

Browse files
committed
Allow passing a selector to hq worker info
1 parent fe5539e commit e3f24bb

15 files changed

Lines changed: 246 additions & 189 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
* We now ensure that after a successful modifying client's operation (submit, cancel, open/close job, queues
2222
modification),
2323
the operation is immediately a part of the written journal.
24+
* `hq worker info` can now be used with a selector, to display information about multiple workers at once.
25+
* Note that this is a breaking change for the JSON format, as it now outputs the worker infos as an array of objects. Before it was a single object.
2426

2527
### Changes
2628

crates/hyperqueue/src/bin/hq.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,21 @@ async fn command_worker_info(
263263
opts: WorkerInfoOpts,
264264
) -> anyhow::Result<()> {
265265
let mut session = get_client_session(gsettings.server_directory()).await?;
266-
let response = get_worker_info(&mut session, opts.worker_id, true).await?;
266+
let mut responses = get_worker_info(&mut session, opts.selector, true).await?;
267+
responses.sort_unstable_by_key(|(id, _)| *id);
268+
269+
let workers: Vec<_> = responses
270+
.into_iter()
271+
.filter_map(|(id, worker)| match worker {
272+
Some(w) => Some(w),
273+
None => {
274+
log::error!("Worker {id} not found");
275+
None
276+
}
277+
})
278+
.collect();
267279

268-
if let Some(worker) = response {
269-
gsettings.printer().print_worker_info(worker);
270-
} else {
271-
log::error!("Worker {} not found", opts.worker_id);
272-
}
280+
gsettings.printer().print_worker_info(workers);
273281
Ok(())
274282
}
275283

@@ -312,11 +320,16 @@ async fn command_worker_address(
312320
gsettings: &GlobalSettings,
313321
opts: WorkerAddressOpts,
314322
) -> anyhow::Result<()> {
323+
use hyperqueue::common::arraydef::IntArray;
324+
use hyperqueue::transfer::messages::IdSelector;
325+
315326
let mut session = get_client_session(gsettings.server_directory()).await?;
316-
let response = get_worker_info(&mut session, opts.worker_id, false).await?;
327+
let selector = IdSelector::Specific(IntArray::from_id(opts.worker_id.as_num()));
328+
let response = get_worker_info(&mut session, selector, false).await?;
317329

318-
match response {
319-
Some(info) => println!("{}", info.configuration.hostname),
330+
match response.into_iter().next() {
331+
Some((_, Some(info))) => println!("{}", info.configuration.hostname),
332+
Some((id, None)) => anyhow::bail!("Worker {} not found", id),
320333
None => anyhow::bail!("Worker {} not found", opts.worker_id),
321334
}
322335

crates/hyperqueue/src/client/commands/worker.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,14 +310,14 @@ pub async fn start_hq_worker(
310310

311311
let worker = initialize_worker(gsettings.server_directory(), configuration).await?;
312312

313-
gsettings.printer().print_worker_info(WorkerInfo {
313+
gsettings.printer().print_worker_info(vec![WorkerInfo {
314314
id: worker.id,
315315
configuration: worker.configuration.clone(),
316316
started: Utc::now(),
317317
ended: None,
318318
runtime_info: None,
319319
last_task_started: None,
320-
});
320+
}]);
321321
worker.run().await?;
322322
log::info!("Worker stopping");
323323
Ok(())
@@ -527,13 +527,13 @@ pub async fn get_worker_list(
527527

528528
pub async fn get_worker_info(
529529
session: &mut ClientSession,
530-
worker_id: WorkerId,
530+
selector: IdSelector,
531531
runtime_info: bool,
532-
) -> crate::Result<Option<WorkerInfo>> {
532+
) -> crate::Result<Vec<(WorkerId, Option<WorkerInfo>)>> {
533533
let msg = rpc_call!(
534534
session.connection(),
535535
FromClientMessage::WorkerInfo(WorkerInfoRequest {
536-
worker_id,
536+
selector,
537537
runtime_info,
538538
}),
539539
ToClientMessage::WorkerInfoResponse(r) => r

crates/hyperqueue/src/client/output/cli.rs

Lines changed: 132 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -267,137 +267,142 @@ impl Output for CliOutput {
267267
self.print_horizontal_table(rows, header);
268268
}
269269

270-
fn print_worker_info(&self, worker_info: WorkerInfo) {
271-
let state = worker_status(&worker_info);
272-
let WorkerInfo {
273-
id,
274-
configuration,
275-
started,
276-
ended: _ended,
277-
runtime_info,
278-
last_task_started,
279-
} = worker_info;
280-
281-
let manager_info = configuration.get_manager_info();
282-
let mut rows = vec![
283-
vec!["Worker".cell().bold(true), id.cell()],
284-
vec!["State".cell().bold(true), state],
285-
vec!["Hostname".cell().bold(true), configuration.hostname.cell()],
286-
vec!["Started".cell().bold(true), format_datetime(started).cell()],
287-
vec![
288-
"Data provider".cell().bold(true),
289-
configuration.listen_address.cell(),
290-
],
291-
vec![
292-
"Working directory".cell().bold(true),
293-
configuration.work_dir.display().cell(),
294-
],
295-
vec![
296-
"Heartbeat".cell().bold(true),
297-
format_duration(configuration.heartbeat_interval)
298-
.to_string()
299-
.cell(),
300-
],
301-
vec![
302-
"Idle timeout".cell().bold(true),
303-
configuration
304-
.idle_timeout
305-
.map(|x| format_duration(x).to_string())
306-
.unwrap_or_else(|| "None".to_string())
307-
.cell(),
308-
],
309-
vec![
310-
"Overview interval".cell().bold(true),
311-
configuration
312-
.overview_configuration
313-
.send_interval
314-
.map(|x| format_duration(x).to_string())
315-
.unwrap_or_else(|| "None".to_string())
316-
.cell(),
317-
],
318-
vec![
319-
"Resources".cell().bold(true),
320-
resources_summary(&configuration.resources, true).cell(),
321-
],
322-
vec![
323-
"Time Limit".cell().bold(true),
324-
configuration
325-
.time_limit
326-
.map(|x| format_duration(x).to_string())
327-
.unwrap_or_else(|| "None".to_string())
328-
.cell(),
329-
],
330-
vec![
331-
"Process pid".cell().bold(true),
332-
configuration
333-
.extra
334-
.get(WORKER_EXTRA_PROCESS_PID)
335-
.cloned()
336-
.unwrap_or_else(|| "N/A".to_string())
337-
.cell(),
338-
],
339-
vec!["Group".cell().bold(true), configuration.group.cell()],
340-
vec![
341-
"Downloads".cell().bold(true),
342-
format!(
343-
"{} parallel; max {} fails + {} delay",
344-
configuration.max_parallel_downloads,
345-
configuration.max_download_tries,
346-
format_duration(configuration.wait_between_download_tries)
347-
)
348-
.cell(),
349-
],
350-
vec![
351-
"Manager".cell().bold(true),
352-
manager_info
353-
.as_ref()
354-
.map(|info| info.manager.to_string())
355-
.unwrap_or_else(|| "None".to_string())
356-
.cell(),
357-
],
358-
vec![
359-
"Manager Job ID".cell().bold(true),
360-
manager_info
361-
.as_ref()
362-
.map(|info| info.allocation_id.as_str())
363-
.unwrap_or("N/A")
270+
fn print_worker_info(&self, workers: Vec<WorkerInfo>) {
271+
for (i, worker_info) in workers.into_iter().enumerate() {
272+
if i > 0 {
273+
println!();
274+
}
275+
let state = worker_status(&worker_info);
276+
let WorkerInfo {
277+
id,
278+
configuration,
279+
started,
280+
ended: _ended,
281+
runtime_info,
282+
last_task_started,
283+
} = worker_info;
284+
285+
let manager_info = configuration.get_manager_info();
286+
let mut rows = vec![
287+
vec!["Worker".cell().bold(true), id.cell()],
288+
vec!["State".cell().bold(true), state],
289+
vec!["Hostname".cell().bold(true), configuration.hostname.cell()],
290+
vec!["Started".cell().bold(true), format_datetime(started).cell()],
291+
vec![
292+
"Data provider".cell().bold(true),
293+
configuration.listen_address.cell(),
294+
],
295+
vec![
296+
"Working directory".cell().bold(true),
297+
configuration.work_dir.display().cell(),
298+
],
299+
vec![
300+
"Heartbeat".cell().bold(true),
301+
format_duration(configuration.heartbeat_interval)
302+
.to_string()
303+
.cell(),
304+
],
305+
vec![
306+
"Idle timeout".cell().bold(true),
307+
configuration
308+
.idle_timeout
309+
.map(|x| format_duration(x).to_string())
310+
.unwrap_or_else(|| "None".to_string())
311+
.cell(),
312+
],
313+
vec![
314+
"Overview interval".cell().bold(true),
315+
configuration
316+
.overview_configuration
317+
.send_interval
318+
.map(|x| format_duration(x).to_string())
319+
.unwrap_or_else(|| "None".to_string())
320+
.cell(),
321+
],
322+
vec![
323+
"Resources".cell().bold(true),
324+
resources_summary(&configuration.resources, true).cell(),
325+
],
326+
vec![
327+
"Time Limit".cell().bold(true),
328+
configuration
329+
.time_limit
330+
.map(|x| format_duration(x).to_string())
331+
.unwrap_or_else(|| "None".to_string())
332+
.cell(),
333+
],
334+
vec![
335+
"Process pid".cell().bold(true),
336+
configuration
337+
.extra
338+
.get(WORKER_EXTRA_PROCESS_PID)
339+
.cloned()
340+
.unwrap_or_else(|| "N/A".to_string())
341+
.cell(),
342+
],
343+
vec!["Group".cell().bold(true), configuration.group.cell()],
344+
vec![
345+
"Downloads".cell().bold(true),
346+
format!(
347+
"{} parallel; max {} fails + {} delay",
348+
configuration.max_parallel_downloads,
349+
configuration.max_download_tries,
350+
format_duration(configuration.wait_between_download_tries)
351+
)
364352
.cell(),
365-
],
366-
vec![
367-
"Last task started".cell().bold(true),
368-
last_task_started
369-
.map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell())
370-
.unwrap_or_else(|| "".cell()),
371-
],
372-
];
373-
if let Some(runtime_info) = runtime_info {
374-
let mut s = String::with_capacity(60);
375-
match runtime_info {
376-
WorkerRuntimeInfo::SingleNodeTasks {
377-
running_tasks,
378-
assigned_tasks,
379-
is_reserved,
380-
} => {
381-
write!(s, "assigned tasks: {assigned_tasks}").unwrap();
382-
if running_tasks > 0 {
383-
write!(s, "; running tasks: {running_tasks}").unwrap();
384-
}
385-
if is_reserved {
386-
write!(s, "; reserved for a multi-node task").unwrap();
353+
],
354+
vec![
355+
"Manager".cell().bold(true),
356+
manager_info
357+
.as_ref()
358+
.map(|info| info.manager.to_string())
359+
.unwrap_or_else(|| "None".to_string())
360+
.cell(),
361+
],
362+
vec![
363+
"Manager Job ID".cell().bold(true),
364+
manager_info
365+
.as_ref()
366+
.map(|info| info.allocation_id.as_str())
367+
.unwrap_or("N/A")
368+
.cell(),
369+
],
370+
vec![
371+
"Last task started".cell().bold(true),
372+
last_task_started
373+
.map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell())
374+
.unwrap_or_else(|| "".cell()),
375+
],
376+
];
377+
if let Some(runtime_info) = runtime_info {
378+
let mut s = String::with_capacity(60);
379+
match runtime_info {
380+
WorkerRuntimeInfo::SingleNodeTasks {
381+
running_tasks,
382+
assigned_tasks,
383+
is_reserved,
384+
} => {
385+
write!(s, "assigned tasks: {assigned_tasks}").unwrap();
386+
if running_tasks > 0 {
387+
write!(s, "; running tasks: {running_tasks}").unwrap();
388+
}
389+
if is_reserved {
390+
write!(s, "; reserved for a multi-node task").unwrap();
391+
}
387392
}
388-
}
389-
WorkerRuntimeInfo::MultiNodeTask { main_node } => {
390-
write!(s, "running multinode task; ").unwrap();
391-
if main_node {
392-
write!(s, "main node").unwrap();
393-
} else {
394-
write!(s, "secondary node").unwrap();
393+
WorkerRuntimeInfo::MultiNodeTask { main_node } => {
394+
write!(s, "running multinode task; ").unwrap();
395+
if main_node {
396+
write!(s, "main node").unwrap();
397+
} else {
398+
write!(s, "secondary node").unwrap();
399+
}
395400
}
396-
}
397-
};
398-
rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]);
401+
};
402+
rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]);
403+
}
404+
self.print_vertical_table(rows);
399405
}
400-
self.print_vertical_table(rows);
401406
}
402407

403408
fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) {

crates/hyperqueue/src/client/output/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ impl Output for JsonOutput {
5252
fn print_worker_list(&self, workers: Vec<WorkerInfo>) {
5353
self.print(workers.into_iter().map(format_worker_info).collect());
5454
}
55-
fn print_worker_info(&self, worker_info: WorkerInfo) {
56-
self.print(format_worker_info(worker_info));
55+
fn print_worker_info(&self, workers: Vec<WorkerInfo>) {
56+
self.print(workers.into_iter().map(format_worker_info).collect());
5757
}
5858

5959
fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) {

crates/hyperqueue/src/client/output/outputs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub enum OutputStream {
3636
pub trait Output {
3737
// Workers
3838
fn print_worker_list(&self, workers: Vec<WorkerInfo>);
39-
fn print_worker_info(&self, worker_info: WorkerInfo);
39+
fn print_worker_info(&self, workers: Vec<WorkerInfo>);
4040

4141
// Server
4242
fn print_server_info(&self, server_dir: Option<&Path>, record: &ServerInfo);

crates/hyperqueue/src/client/output/quiet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl Output for Quiet {
5757
println!("{} {}", worker.id, worker_status)
5858
}
5959
}
60-
fn print_worker_info(&self, _worker_info: WorkerInfo) {}
60+
fn print_worker_info(&self, _workers: Vec<WorkerInfo>) {}
6161

6262
// Server
6363
fn print_server_info(&self, server_dir: Option<&Path>, _record: &ServerInfo) {

0 commit comments

Comments
 (0)