Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions devolutions-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sha2 = "0.10"
serde_json = "1"
serde = { version = "1", features = ["derive"] }
tap = "1.0"
tempfile = "3"
tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12", "ring"] }
tracing = "0.1"
Expand Down Expand Up @@ -98,8 +99,5 @@ features = [
[target.'cfg(windows)'.build-dependencies]
embed-resource = "3.0"

[dev-dependencies]
tempfile = "3"

[target.'cfg(windows)'.dev-dependencies]
expect-test = "1.5"
16 changes: 4 additions & 12 deletions devolutions-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,10 @@ pub mod dto {
pub server_spki_sha256: Option<String>,
}

#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
/// PowerShell Universal Event Hub compatibility configuration.
///
/// Defaults to disabled.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "PascalCase")]
pub struct PsuEventHubConf {
/// Enable PowerShell Universal Event Hub compatibility.
Expand All @@ -544,17 +547,6 @@ pub mod dto {
pub powershell: PsuPowerShellConf,
}

#[allow(clippy::derivable_impls)] // Just to be explicit about default disabled behavior.
impl Default for PsuEventHubConf {
fn default() -> Self {
Self {
enabled: false,
connections: Vec::new(),
powershell: PsuPowerShellConf::default(),
}
}
}

#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct PsuEventHubConnectionConf {
Expand Down
36 changes: 24 additions & 12 deletions devolutions-agent/src/psu_event_hub/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::sync::Arc;
use anyhow::Context as _;
use camino::Utf8PathBuf;
use serde_json::Value;
use tokio::task::JoinSet;
use uuid::Uuid;

use crate::config::dto::{PsuEventHubConnectionConf, PsuPowerShellConf};
use crate::config::dto::PsuEventHubConnectionConf;
use crate::psu_event_hub::models::WebsocketEventResponse;
use crate::psu_event_hub::powershell_worker::PowerShellWorker;
use crate::psu_event_hub::result_store::ResultStore;
Expand All @@ -19,16 +20,21 @@ pub(super) struct EventHubExecutor {
}

impl EventHubExecutor {
pub(super) fn new(connection: &PsuEventHubConnectionConf, power_shell: PsuPowerShellConf) -> Self {
pub(super) fn new(connection: &PsuEventHubConnectionConf, worker: Arc<PowerShellWorker>) -> Self {
Self {
hub: connection.hub.clone(),
script_path: connection.script_path.as_ref().map(normalize_script_path),
worker: Arc::new(PowerShellWorker::new(power_shell)),
worker,
result_store: ResultStore::default(),
}
}

pub(super) fn handle_invocation(&self, target: &str, arguments: &[Value]) -> anyhow::Result<Option<Value>> {
pub(super) fn handle_invocation(
&self,
target: &str,
arguments: &[Value],
execution_tasks: &mut JoinSet<()>,
) -> anyhow::Result<Option<Value>> {
if target == "GetResult" {
let execution_id = required_string_argument(arguments, 0, "event id")?;
let result = self.result_store.take(execution_id);
Expand All @@ -39,41 +45,47 @@ impl EventHubExecutor {

if target == self.hub {
let data = required_string_argument(arguments, 0, "event data")?.to_owned();
let execution_id = self.execute_script(data, true);
let execution_id = self.execute_script(data, true, execution_tasks);
return Ok(Some(Value::String(execution_id)));
}

if target == format!("{}Void", self.hub) {
let data = required_string_argument(arguments, 0, "event data")?.to_owned();
self.execute_script(data, false);
self.execute_script(data, false, execution_tasks);
return Ok(None);
}

if target == format!("{}Module", self.hub) {
let command = required_string_argument(arguments, 0, "command")?.to_owned();
let data = required_string_argument(arguments, 1, "event data")?.to_owned();
let execution_id = self.execute_command(command, data, true);
let execution_id = self.execute_command(command, data, true, execution_tasks);
return Ok(Some(Value::String(execution_id)));
}

if target == format!("{}ModuleVoid", self.hub) {
let command = required_string_argument(arguments, 0, "command")?.to_owned();
let data = required_string_argument(arguments, 1, "event data")?.to_owned();
self.execute_command(command, data, false);
self.execute_command(command, data, false, execution_tasks);
return Ok(None);
}

warn!(%target, hub = %self.hub, "Received unknown PSU Event Hub invocation");
Ok(None)
}

fn execute_command(&self, command: String, data: String, return_result: bool) -> String {
fn execute_command(
&self,
command: String,
data: String,
return_result: bool,
execution_tasks: &mut JoinSet<()>,
) -> String {
let execution_id = Uuid::new_v4().to_string();
let worker = Arc::clone(&self.worker);
let result_store = self.result_store.clone();
let stored_execution_id = execution_id.clone();

tokio::spawn(async move {
execution_tasks.spawn(async move {
match worker.execute_command(command, data, return_result).await {
Ok(response) if return_result => result_store.insert(stored_execution_id, response),
Ok(_) => {}
Expand All @@ -90,7 +102,7 @@ impl EventHubExecutor {
execution_id
}

fn execute_script(&self, data: String, return_result: bool) -> String {
fn execute_script(&self, data: String, return_result: bool, execution_tasks: &mut JoinSet<()>) -> String {
let execution_id = Uuid::new_v4().to_string();
let Some(script_path) = self.script_path.clone() else {
if return_result {
Expand All @@ -106,7 +118,7 @@ impl EventHubExecutor {
let result_store = self.result_store.clone();
let stored_execution_id = execution_id.clone();

tokio::spawn(async move {
execution_tasks.spawn(async move {
match worker.execute_script(script_path, data, return_result).await {
Ok(response) if return_result => result_store.insert(stored_execution_id, response),
Ok(_) => {}
Expand Down
54 changes: 50 additions & 4 deletions devolutions-agent/src/psu_event_hub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ mod powershell_worker;
mod result_store;
mod signalr;

use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use devolutions_gateway_task::{ShutdownSignal, Task};
use tokio::task::JoinSet;

use crate::config::ConfHandle;
use crate::config::{ConfHandle, dto};
use crate::psu_event_hub::executor::EventHubExecutor;
use crate::psu_event_hub::powershell_worker::PowerShellWorker;

Expand Down Expand Up @@ -45,16 +48,27 @@ impl Task for PsuEventHubTask {

let mut join_set = JoinSet::new();

let secret_resolver = PowerShellWorker::new(psu_conf.powershell.clone());
let worker = Arc::new(
PowerShellWorker::new(psu_conf.powershell.clone()).context("failed to initialize PSU PowerShell worker")?,
);

for mut connection in psu_conf.connections {
if connection.hub.trim().is_empty() {
warn!(url = %connection.url, "Skipping PSU Event Hub connection without a hub name");
continue;
}

if let Err(error) = validate_connection(&connection) {
error!(
hub = %connection.hub,
error = format!("{error:#}"),
"Skipping PSU Event Hub connection because configuration is invalid"
);
continue;
}

if let Some(app_token) = connection.app_token.as_deref() {
match secret_resolver.resolve_app_token(app_token).await {
match worker.resolve_app_token(app_token).await {
Ok(resolved) => connection.app_token = Some(resolved),
Err(error) => {
error!(
Expand All @@ -67,7 +81,7 @@ impl Task for PsuEventHubTask {
}
}

let executor = EventHubExecutor::new(&connection, psu_conf.powershell.clone());
let executor = EventHubExecutor::new(&connection, Arc::clone(&worker));
let connection_shutdown_signal = shutdown_signal.clone();

join_set
Expand All @@ -85,3 +99,35 @@ impl Task for PsuEventHubTask {
Ok(())
}
}

fn validate_connection(connection: &dto::PsuEventHubConnectionConf) -> anyhow::Result<()> {
if connection.use_default_credentials && connection.app_token.is_none() {
anyhow::bail!(
"PSU Event Hub UseDefaultCredentials is configured for hub {}, but Windows default credentials are not implemented",
connection.hub
);
}

Ok(())
}

#[cfg(test)]
mod tests {
use url::Url;

use super::*;

#[test]
fn default_credentials_without_app_token_are_rejected() {
let connection = dto::PsuEventHubConnectionConf {
hub: "Hub".to_owned(),
url: Url::parse("http://localhost:5000").expect("parse URL"),
app_token: None,
use_default_credentials: true,
script_path: None,
description: None,
};

assert!(validate_connection(&connection).is_err());
}
}
10 changes: 10 additions & 0 deletions devolutions-agent/src/psu_event_hub/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ impl WebsocketEventResponse {
terminating_error: Some(message.into()),
}
}

pub(super) fn timeout(message: impl Into<String>) -> Self {
Self {
data: None,
job_outputs: Vec::new(),
complete: true,
timeout: true,
terminating_error: Some(message.into()),
}
}
}

impl Default for WebsocketEventResponse {
Expand Down
Loading
Loading