diff --git a/coman/src/app/ids.rs b/coman/src/app/ids.rs index 6f69f30..26ae939 100644 --- a/coman/src/app/ids.rs +++ b/coman/src/app/ids.rs @@ -13,5 +13,6 @@ pub enum Id { LoginPopup, DownloadPopup, SystemSelectPopup, + JobFilterPopup, FileView, } diff --git a/coman/src/app/messages.rs b/coman/src/app/messages.rs index 1bc763f..8c3af1c 100644 --- a/coman/src/app/messages.rs +++ b/coman/src/app/messages.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use crate::{ app::user_events::UserEvent, - cscs::api_client::types::{JobDetail, JobId, System}, + cscs::api_client::types::{JobDetail, JobId, JobStatus, System}, }; #[derive(Debug, PartialEq)] @@ -11,6 +11,7 @@ pub enum MenuMsg { Opened, Closed, CscsLogin, + CscsShowFilterPopup, CscsSwitchSystem, Event(UserEvent), } @@ -45,6 +46,13 @@ pub enum SystemSelectMsg { SystemSelected(String), } +#[derive(Debug, PartialEq)] +pub enum JobFilterPopupMsg { + Opened, + Closed, + FilterSelected(Vec), +} + #[derive(Debug, PartialEq)] pub enum CscsMsg { Login(String, String), @@ -86,6 +94,7 @@ pub enum Msg { LoginPopup(LoginPopupMsg), DownloadPopup(DownloadPopupMsg), SystemSelectPopup(SystemSelectMsg), + JobFilterPopup(JobFilterPopupMsg), Error(String), Info(String), Cscs(CscsMsg), diff --git a/coman/src/app/model.rs b/coman/src/app/model.rs index 7977e94..3dccb2f 100644 --- a/coman/src/app/model.rs +++ b/coman/src/app/model.rs @@ -14,18 +14,20 @@ use crate::{ app::{ ids::Id, messages::{ - CscsMsg, DownloadPopupMsg, ErrorPopupMsg, InfoPopupMsg, JobMsg, LoginPopupMsg, MenuMsg, Msg, StatusMsg, - SystemSelectMsg, View, + CscsMsg, DownloadPopupMsg, ErrorPopupMsg, InfoPopupMsg, JobFilterPopupMsg, JobMsg, LoginPopupMsg, MenuMsg, + Msg, StatusMsg, SystemSelectMsg, View, }, user_events::{CscsEvent, StatusEvent, UserEvent}, }, components::{ context_menu::ContextMenu, download_popup::DownloadTargetInput, error_popup::ErrorPopup, info_popup::InfoPopup, - login_popup::LoginPopup, resource_usage::ResourceUsage, system_select_popup::SystemSelectPopup, - workload_details::WorkloadDetails, workload_list::WorkloadList, workload_log::WorkloadLog, + job_status_filter_popup::JobStatusFilterPopup, login_popup::LoginPopup, resource_usage::ResourceUsage, + system_select_popup::SystemSelectPopup, workload_details::WorkloadDetails, workload_list::WorkloadList, + workload_log::WorkloadLog, }, config::Config, cscs::{ + api_client::types::JobStatus, handlers::{cscs_login, cscs_system_set, get_available_compute_platforms}, ports::{BackgroundTask, JobLogAction, JobResourceUsageAction}, }, @@ -59,6 +61,9 @@ where /// sending None stops watching pub job_log_tx: mpsc::Sender, + /// Set to filter workload list + pub job_filter_tx: mpsc::Sender>, + /// Triggers watching job logs /// sending None stops watching pub job_resource_usage_tx: mpsc::Sender, @@ -81,6 +86,7 @@ where error_tx: mpsc::Sender, select_system_tx: mpsc::Sender<()>, job_log_tx: mpsc::Sender, + job_filter_tx: mpsc::Sender>, job_resource_usage_tx: mpsc::Sender, user_event_tx: mpsc::Sender, background_task_tx: mpsc::Sender, @@ -94,6 +100,7 @@ where error_tx, select_system_tx, job_log_tx, + job_filter_tx, job_resource_usage_tx, user_event_tx, background_task_tx, @@ -149,6 +156,10 @@ where let popup = draw_area_in_absolute_fixed_height(f.area(), 10, 3); f.render_widget(Clear, popup); app.view(&Id::DownloadPopup, f, popup); + } else if app.mounted(&Id::JobFilterPopup) { + let popup = draw_area_in_absolute_fixed_height(f.area(), 10, 3); + f.render_widget(Clear, popup); + app.view(&Id::JobFilterPopup, f, popup); } }) .is_ok() @@ -281,6 +292,34 @@ where } } } + fn handle_job_filter_popup_msg(&mut self, msg: JobFilterPopupMsg) -> Option { + match msg { + JobFilterPopupMsg::Opened => { + if self.app.mounted(&Id::JobFilterPopup) { + assert!(self.app.umount(&Id::JobFilterPopup).is_ok()); + } + assert!( + self.app + .mount(Id::JobFilterPopup, Box::new(JobStatusFilterPopup::new()), vec![]) + .is_ok() + ); + assert!(self.app.active(&Id::JobFilterPopup).is_ok()); + None + } + JobFilterPopupMsg::FilterSelected(filter) => { + assert!(self.app.umount(&Id::JobFilterPopup).is_ok()); + let filter_tx = self.job_filter_tx.clone(); + tokio::spawn(async move { + filter_tx.send(filter).await.unwrap(); + }); + None + } + JobFilterPopupMsg::Closed => { + assert!(self.app.umount(&Id::JobFilterPopup).is_ok()); + None + } + } + } fn handle_menu_msg(&mut self, msg: MenuMsg) -> Option { match msg { MenuMsg::Opened => { @@ -304,6 +343,10 @@ where assert!(self.app.umount(&Id::Menu).is_ok()); Some(Msg::Cscs(CscsMsg::SelectSystem)) } + MenuMsg::CscsShowFilterPopup => { + assert!(self.app.umount(&Id::Menu).is_ok()); + Some(Msg::JobFilterPopup(JobFilterPopupMsg::Opened)) + } MenuMsg::Event(event) => { assert!(self.app.umount(&Id::Menu).is_ok()); Some(Msg::CreateEvent(event)) @@ -505,6 +548,7 @@ where Msg::ErrorPopup(popup_msg) => self.handle_error_popup_msg(popup_msg), Msg::InfoPopup(popup_msg) => self.handle_info_popup_msg(popup_msg), Msg::DownloadPopup(popup_msg) => self.handle_download_popup_msg(popup_msg), + Msg::JobFilterPopup(popup_msg) => self.handle_job_filter_popup_msg(popup_msg), Msg::Cscs(CscsMsg::Login(client_id, client_secret)) => { let event_tx = self.user_event_tx.clone(); let error_tx = self.error_tx.clone(); diff --git a/coman/src/cli/app.rs b/coman/src/cli/app.rs index 839b918..cb3d0cf 100644 --- a/coman/src/cli/app.rs +++ b/coman/src/cli/app.rs @@ -15,7 +15,7 @@ use crate::{ cscs::{ api_client::{ client::{EdfSpec as EdfSpecEnum, ScriptSpec as ScriptSpecEnum}, - types::PathType, + types::{JobStatus, PathType}, }, handlers::{cscs_file_list, cscs_job_list, file_system_roots}, }, @@ -239,7 +239,10 @@ impl FromStr for JobIdOrName { #[derive(Subcommand, Debug)] pub enum CscsJobCommands { #[clap(alias("ls"), about = "List all jobs [aliases: ls]")] - List, + List { + #[clap(short,long,help="filter by job status (separated by ',', [running, pending, finished, cancelled, failed, timeout, requeued])", value_delimiter=',', num_args=1.., value_enum)] + status: Option>, + }, #[clap(alias("g"), about = "Get metadata for a specific job [aliases: g]")] Get { #[arg(help="id or name of the job (name uses newest job of that name)", add = ArgValueCompleter::new(job_id_or_name_completer))] @@ -330,7 +333,7 @@ fn job_id_or_name_completer(current: &std::ffi::OsStr) -> Vec { tokio::spawn(async move { - let jobs = cscs_job_list(None, None).await.unwrap(); + let jobs = cscs_job_list(None, None, None).await.unwrap(); let partial_id = id.to_string(); let ids: Vec<_> = jobs .iter() @@ -345,7 +348,7 @@ fn job_id_or_name_completer(current: &std::ffi::OsStr) -> Vec { tokio::spawn(async move { - let jobs = cscs_job_list(None, None).await.unwrap(); + let jobs = cscs_job_list(None, None, None).await.unwrap(); let names: Vec<_> = jobs .into_iter() .map(|j| j.name) diff --git a/coman/src/components/context_menu.rs b/coman/src/components/context_menu.rs index 579c4c9..f8251aa 100644 --- a/coman/src/components/context_menu.rs +++ b/coman/src/components/context_menu.rs @@ -20,21 +20,23 @@ pub struct ContextMenu { impl ContextMenu { fn workload_options() -> Table { TableBuilder::default() + .add_col(TextSpan::from("Cancel Job").fg(Color::Cyan)) + .add_row() + .add_col(TextSpan::from("Filter by Status").fg(Color::Cyan)) + .add_row() .add_col(TextSpan::from("Login to CSCS").fg(Color::Cyan)) .add_row() .add_col(TextSpan::from("Switch System").fg(Color::Cyan)) .add_row() - .add_col(TextSpan::from("Cancel Job").fg(Color::Cyan)) - .add_row() .add_col(TextSpan::from("Quit").fg(Color::Cyan)) .add_row() .build() } fn workload_actions(index: usize) -> Option { match index { - 0 => Some(Msg::Menu(MenuMsg::CscsLogin)), - 1 => Some(Msg::Menu(MenuMsg::CscsSwitchSystem)), - 2 => Some(Msg::Menu(MenuMsg::Event(UserEvent::Job(JobEvent::Cancel)))), + 0 => Some(Msg::Menu(MenuMsg::Event(UserEvent::Job(JobEvent::Cancel)))), + 1 => Some(Msg::Menu(MenuMsg::CscsShowFilterPopup)), + 2 => Some(Msg::Menu(MenuMsg::CscsSwitchSystem)), 3 => Some(Msg::AppClose), _ => Some(Msg::Menu(MenuMsg::Closed)), } diff --git a/coman/src/components/job_status_filter_popup.rs b/coman/src/components/job_status_filter_popup.rs new file mode 100644 index 0000000..e57913e --- /dev/null +++ b/coman/src/components/job_status_filter_popup.rs @@ -0,0 +1,102 @@ +use strum::{VariantArray, VariantNames}; +use tui_realm_stdlib::Checkbox; +use tuirealm::{ + Attribute, Component, Event, MockComponent, State, + command::{Cmd, CmdResult, Direction}, + event::{Key, KeyEvent}, + props::{Alignment, BorderType, Borders, Color, PropValue}, +}; + +use crate::{ + app::{ + messages::{JobFilterPopupMsg, Msg}, + user_events::UserEvent, + }, + cscs::api_client::types::JobStatus, +}; + +#[derive(MockComponent)] +pub struct JobStatusFilterPopup { + component: Checkbox, + values: Vec, + max_idx: usize, +} + +impl JobStatusFilterPopup { + pub fn new() -> Self { + let mut variants: Vec<_> = ::VARIANTS + .iter() + .map(|v| v.to_owned()) + .collect(); + variants.insert(0, "All"); + let max_idx = variants.len(); + let values: Vec<_> = (0..max_idx).collect(); + Self { + component: Checkbox::default() + .borders(Borders::default().modifiers(BorderType::Thick).color(Color::Green)) + .title("Select Status to show", Alignment::Left) + .rewind(true) + .choices(variants) + .values(&values), + values, + max_idx, + } + } +} + +impl Component for JobStatusFilterPopup { + fn on(&mut self, ev: Event) -> Option { + let _ = match ev { + Event::Keyboard(KeyEvent { code: Key::Right, .. }) => self.perform(Cmd::Move(Direction::Right)), + Event::Keyboard(KeyEvent { code: Key::Left, .. }) => self.perform(Cmd::Move(Direction::Left)), + Event::Keyboard(KeyEvent { + code: Key::Char(' '), .. + }) => { + if let CmdResult::Changed(state) = self.perform(Cmd::Toggle) + && let State::Vec(state) = state + { + let new_state: Vec<_> = state.into_iter().map(|s| s.unwrap_usize()).collect(); + if new_state.contains(&0) && !self.values.contains(&0) { + // 'All' selected, activate all + self.values = (0..self.max_idx).collect(); + } else if !new_state.contains(&0) && self.values.contains(&0) { + // 'All' deselected, deactivate all + self.values = vec![]; + } else { + // check if all values except 'All' are selected + self.values = new_state; + if (1..self.max_idx).all(|s| self.values.contains(&s)) { + self.values.push(0); + } else if self.values.contains(&0) { + self.values.retain(|v| *v != 0); + } + } + self.attr( + Attribute::Value, + tuirealm::AttrValue::Payload(tuirealm::props::PropPayload::Vec( + self.values.iter().map(|v| PropValue::Usize(*v)).collect(), + )), + ); + } + CmdResult::None + } + Event::Keyboard(KeyEvent { code: Key::Esc, .. }) => { + return Some(Msg::JobFilterPopup(JobFilterPopupMsg::Closed)); + } + Event::Keyboard(KeyEvent { code: Key::Enter, .. }) => { + let selected_statuses = self + .values + .iter() + .filter(|v| **v > 0) + .map(|v| ::VARIANTS[*v - 1].clone()) + .collect(); + + return Some(Msg::JobFilterPopup(JobFilterPopupMsg::FilterSelected( + selected_statuses, + ))); + } + _ => CmdResult::None, + }; + Some(Msg::None) + } +} diff --git a/coman/src/components/mod.rs b/coman/src/components/mod.rs index e0cec75..4c557df 100644 --- a/coman/src/components/mod.rs +++ b/coman/src/components/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod error_popup; pub(crate) mod file_tree; pub(crate) mod global_listener; pub(crate) mod info_popup; +pub(crate) mod job_status_filter_popup; pub(crate) mod login_popup; pub(crate) mod resource_usage; pub(crate) mod status_bar; diff --git a/coman/src/cscs/api_client/client.rs b/coman/src/cscs/api_client/client.rs index 6cfd8b1..2ca8066 100644 --- a/coman/src/cscs/api_client/client.rs +++ b/coman/src/cscs/api_client/client.rs @@ -20,7 +20,7 @@ use reqwest::Url; use crate::{ config::{ComputePlatform, Config}, - cscs::api_client::types::{FileStat, Job, JobDetail, JobId, PathEntry, S3Upload, System, UserInfo}, + cscs::api_client::types::{FileStat, Job, JobDetail, JobId, JobStatus, PathEntry, S3Upload, System, UserInfo}, trace_dbg, util::types::DockerImageUrl, }; @@ -110,14 +110,23 @@ impl CscsApi { .wrap_err("couldn't list CSCS systems")?; Ok(result.systems.into_iter().map(|s| s.into()).collect()) } - pub async fn list_jobs(&self, system_name: &str, all_users: Option) -> Result> { + pub async fn list_jobs( + &self, + status: Option>, + system_name: &str, + all_users: Option, + ) -> Result> { let result = get_compute_system_jobs(&self.client, system_name, all_users) .await .wrap_err("couldn't fetch cscs jobs")?; - Ok(result + let mut result: Vec = result .jobs .map(|jobs| jobs.into_iter().map(|j| j.into()).collect()) - .unwrap_or(vec![])) + .unwrap_or(vec![]); + if let Some(filter) = status { + result.retain(|j| filter.contains(&j.status)); + } + Ok(result) } pub async fn get_job(&self, system_name: &str, job_id: JobId) -> Result> { let jobs = get_compute_system_job(&self.client, system_name, job_id.clone().into_string()) @@ -263,6 +272,7 @@ mod tests { use injectorpp::interface::injector::*; use super::*; + use crate::cscs::api_client::types::JobStatus as ApiJobStatus; fn get_client() -> CscsApi { CscsApi { @@ -409,8 +419,24 @@ mod tests { }), Result )); - let result = client.list_jobs("daint", None).await; + let result = client.list_jobs(None, "daint", None).await; assert_eq!(result.unwrap().len(), 5); + let result = client + .list_jobs(Some(vec![ApiJobStatus::Running, ApiJobStatus::Pending]), "daint", None) + .await; + assert_eq!(result.unwrap().len(), 2); + let result = client + .list_jobs( + Some(vec![ + ApiJobStatus::Failed, + ApiJobStatus::Finished, + ApiJobStatus::Cancelled, + ]), + "daint", + None, + ) + .await; + assert_eq!(result.unwrap().len(), 3); } } diff --git a/coman/src/cscs/api_client/types.rs b/coman/src/cscs/api_client/types.rs index 728e6fd..b0b80b9 100644 --- a/coman/src/cscs/api_client/types.rs +++ b/coman/src/cscs/api_client/types.rs @@ -7,6 +7,7 @@ use firecrest_client::types::{ }; use reqwest::Url; use strum::Display; +use strum_macros::{VariantArray, VariantNames}; #[derive(Debug, Eq, Clone, PartialEq, PartialOrd, Ord, tabled::Tabled)] pub struct JobId(String); @@ -180,7 +181,7 @@ impl From for FileStat { } } -#[derive(Debug, Eq, Clone, PartialEq, PartialOrd, Ord, Display)] +#[derive(Debug, Eq, Clone, PartialEq, PartialOrd, Ord, Display, VariantArray, VariantNames)] pub enum JobStatus { Pending, Running, @@ -192,7 +193,7 @@ pub enum JobStatus { } impl From for JobStatus { fn from(value: String) -> Self { - match value.split_whitespace().next().unwrap() { + match value.split_whitespace().next().unwrap_or("").to_uppercase().as_str() { "RUNNING" => JobStatus::Running, "FAILED" => JobStatus::Failed, "COMPLETED" => JobStatus::Finished, diff --git a/coman/src/cscs/cli.rs b/coman/src/cscs/cli.rs index 62ebffe..8531145 100644 --- a/coman/src/cscs/cli.rs +++ b/coman/src/cscs/cli.rs @@ -71,8 +71,12 @@ pub(crate) async fn cli_cscs_login() -> Result<()> { } Ok(()) } -pub(crate) async fn cli_cscs_job_list(system: Option, platform: Option) -> Result<()> { - match cscs_job_list(system, platform).await { +pub(crate) async fn cli_cscs_job_list( + status: Option>, + system: Option, + platform: Option, +) -> Result<()> { + match cscs_job_list(status, system, platform).await { Ok(jobs) => { let mut table = tabled::Table::new(jobs); table.with(tabled::settings::Style::modern()); @@ -90,7 +94,7 @@ async fn maybe_job_id_from_name( ) -> Result { match j { JobIdOrName::Id(id) => Ok(id.into()), - JobIdOrName::Name(name) => match cscs_job_list(system, platform).await { + JobIdOrName::Name(name) => match cscs_job_list(None, system, platform).await { Ok(jobs) => { let job = jobs .iter() diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index bb37c7c..a9d4244 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -136,13 +136,17 @@ pub async fn cscs_system_set(system_name: String, global: bool) -> Result<()> { config.set("cscs.current_system", system_name, global) } -pub async fn cscs_job_list(system: Option, platform: Option) -> Result> { +pub async fn cscs_job_list( + status: Option>, + system: Option, + platform: Option, +) -> Result> { match get_access_token().await { Ok(access_token) => { let api_client = CscsApi::new(access_token.0, platform).unwrap(); let config = Config::new().unwrap(); api_client - .list_jobs(&system.unwrap_or(config.values.cscs.current_system), None) + .list_jobs(status, &system.unwrap_or(config.values.cscs.current_system), None) .await } Err(e) => Err(e), @@ -367,7 +371,7 @@ async fn garbage_collect_ssh(api_client: &CscsApi, current_system: &str) -> Resu if !data_dir.exists() { return Ok(()); } - let jobs = api_client.list_jobs(current_system, None).await?; + let jobs = api_client.list_jobs(None, current_system, None).await?; let job_entries: HashSet<_> = jobs .iter() .filter(|j| j.status == JobStatus::Pending || j.status == JobStatus::Running || j.status == JobStatus::Requeued) diff --git a/coman/src/cscs/ports.rs b/coman/src/cscs/ports.rs index dff4c28..4d1362b 100644 --- a/coman/src/cscs/ports.rs +++ b/coman/src/cscs/ports.rs @@ -6,6 +6,7 @@ use color_eyre::{ }; use futures::StreamExt; use openidconnect::core::CoreDeviceAuthorizationResponse; +use strum::VariantArray; use tokio::{fs::File, io::AsyncWriteExt, sync::mpsc, time::Instant}; use tuirealm::{ Event, @@ -102,18 +103,32 @@ impl PollAsync for AsyncDeviceFlowPort { } /// This port periodically fetches jobs from CSCS -pub(crate) struct AsyncFetchWorkloadsPort {} +pub(crate) struct AsyncFetchWorkloadsPort { + filter: Vec, + receiver: mpsc::Receiver>, +} impl AsyncFetchWorkloadsPort { - pub fn new() -> Self { - Self {} + pub fn new(receiver: mpsc::Receiver>) -> Self { + Self { + filter: JobStatus::VARIANTS.to_vec(), + receiver, + } } } #[tuirealm::async_trait] impl PollAsync for AsyncFetchWorkloadsPort { async fn poll(&mut self) -> ListenerResult>> { - match cscs_job_list(None, None).await { + if self.receiver.is_closed() { + return Ok(Some(Event::None)); + } + if !self.receiver.is_empty() + && let Some(val) = self.receiver.recv().await + { + self.filter = val; + } + match cscs_job_list(Some(self.filter.clone()), None, None).await { Ok(jobs) => Ok(Some(Event::User(UserEvent::Cscs(CscsEvent::GotWorkloadData(jobs))))), Err(e) => { let _ = trace_dbg!(e); diff --git a/coman/src/main.rs b/coman/src/main.rs index 6dae3f5..2bc8473 100644 --- a/coman/src/main.rs +++ b/coman/src/main.rs @@ -107,7 +107,7 @@ async fn main() -> Result<()> { } => match cscs_command { CscsCommands::Login => cli_cscs_login().await?, CscsCommands::Job { command } => match command { - CscsJobCommands::List => cli_cscs_job_list(system, platform).await?, + CscsJobCommands::List { status } => cli_cscs_job_list(status, system, platform).await?, CscsJobCommands::Get { job } => cli_cscs_job_detail(job, system, platform).await?, CscsJobCommands::Log { job, stderr } => cli_cscs_job_log(job, stderr, system, platform).await?, CscsJobCommands::Submit { @@ -192,6 +192,7 @@ fn run_tui(tick_rate: f64) -> Result<()> { let (select_system_tx, select_system_rx) = mpsc::channel(100); let (job_log_tx, job_log_rx) = mpsc::channel(100); + let (job_filter_tx, job_filter_rx) = mpsc::channel(100); let (job_resource_usage_tx, job_resource_usage_rx) = mpsc::channel(100); let (background_task_tx, background_task_rx) = mpsc::channel(100); let (user_event_tx, user_event_rx) = mpsc::channel(100); @@ -206,7 +207,11 @@ fn run_tui(tick_rate: f64) -> Result<()> { .tick_interval(Duration::from_millis((1000.0 / tick_rate) as u64)) .async_crossterm_input_listener(Duration::default(), 3) .add_async_port(Box::new(AsyncErrorPort::new(error_rx)), Duration::default(), 1) - .add_async_port(Box::new(AsyncFetchWorkloadsPort::new()), Duration::from_secs(2), 1) + .add_async_port( + Box::new(AsyncFetchWorkloadsPort::new(job_filter_rx)), + Duration::from_secs(2), + 1, + ) .add_async_port( Box::new(AsyncSelectSystemPort::new(select_system_rx)), Duration::default(), @@ -339,6 +344,7 @@ fn run_tui(tick_rate: f64) -> Result<()> { error_tx, select_system_tx, job_log_tx, + job_filter_tx, job_resource_usage_tx, user_event_tx, background_task_tx, @@ -382,5 +388,6 @@ fn popup_exclusion_clause() -> SubClause { SubClause::IsMounted(Id::LoginPopup), SubClause::IsMounted(Id::DownloadPopup), SubClause::IsMounted(Id::SystemSelectPopup), + SubClause::IsMounted(Id::JobFilterPopup), ]))) }