diff --git a/CHANGELOG.md b/CHANGELOG.md index cdbef3e..6866b7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ## Unreleased +- Fix: `df.instance_nodes` no longer shows losing-branch nodes as `running` or `pending` after a `df.race` completes. The orchestrator now marks all non-terminal nodes in the losing branch as `cancelled` once the winning branch is decided. Schema change: `nodes_status_chk` is widened to include `'cancelled'`; upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`. - Fix: `df.signal()` now propagates the event to running sub-orchestrations spawned by `df.race` / `df.join` / `df.join3`, so `df.wait_for_signal` inside a parallel branch wakes as expected. Known limitation: signals raised before the target sub-orchestration is in the `Running` state are not yet redelivered when it starts; a proper fix requires unmatched-event forwarding in duroxide (#154). - Fix: `df.start()`, RLS policies on `df.instances` / `df.nodes` / `df.vars`, and `df.vars` reads/writes no longer fail with `role "..." does not exist` when `current_user` requires quoting (mixed case, spaces, embedded quotes). All `current_user::regrole` casts are now wrapped with `quote_ident()` so the role lookup preserves the original identifier casing (#161, #162). Schema upgrade DDL is in `sql/pg_durable--0.2.1--0.2.2.sql`. diff --git a/Cargo.lock b/Cargo.lock index ca1c17a..55e3e3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,7 @@ dependencies = [ [[package]] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" dependencies = [ "bigdecimal", "chrono", diff --git a/Cargo.toml b/Cargo.toml index cb769e6..317e864 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "MIT" repository = "https://github.com/Azure/pg_durable" diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 567b831..c48d2e6 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -192,6 +192,14 @@ No additional fixture is needed for subsequent minors — intermediate versions Each schema-changing PR should add a section here documenting what changed, what the upgrade script handles, and any backward compatibility considerations. +### v0.2.2 → v0.2.3 + +#### Race loser node cancellation (fixes df.instance_nodes ghost nodes) +- **DDL change:** `nodes_status_chk` on `df.nodes` is dropped and recreated to include `'cancelled'` in the allowed status set. The existing `nodes_result_status_chk` (`result IS NULL OR status IN ('completed', 'failed')`) is unchanged — cancelled nodes carry no result. Upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`. +- **Scenario A considerations:** Schema comparison must verify that `nodes_status_chk` now allows `'cancelled'` on both fresh installs and upgraded databases. +- **Scenario B1 considerations:** The new `.so` continues to work against v0.2.2 schemas (the constraint is `NOT VALID`, so pre-existing rows are unaffected). The new `cancel_subtree_nodes` activity that writes `'cancelled'` will fail the constraint only on schemas that have not been upgraded; for those deployments the activity will error and the orchestrator will log a warning, but existing workflows are otherwise unaffected. The ghost-node symptom remains until the schema is upgraded. +- **Scenario B2 considerations:** No data migration needed. All existing `df.nodes` rows have status in `('pending', 'running', 'completed', 'failed')` and continue to satisfy the widened constraint. + ### v0.2.1 → v0.2.2 #### #162 quote_ident-wrapped `current_user::regrole` (fixes #161) diff --git a/sql/pg_durable--0.2.2--0.2.3.sql b/sql/pg_durable--0.2.2--0.2.3.sql new file mode 100644 index 0000000..2005e93 --- /dev/null +++ b/sql/pg_durable--0.2.2--0.2.3.sql @@ -0,0 +1,16 @@ +-- pg_durable upgrade: 0.2.2 → 0.2.3 +-- +-- Fix: df.instance_nodes leaves race-loser nodes as 'running' or 'pending' after +-- a race completes. The orchestrator now marks all non-terminal nodes in the +-- losing branch of a RACE as 'cancelled' once the winning branch finishes. +-- +-- To support the new 'cancelled' node status, the nodes_status_chk constraint is +-- widened to include it. The nodes_result_status_chk constraint is unchanged: +-- cancelled nodes carry no result, so result IS NULL already satisfies it. + +ALTER TABLE df.nodes + DROP CONSTRAINT IF EXISTS nodes_status_chk; + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_status_chk + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID; diff --git a/src/activities/cancel_subtree_nodes.rs b/src/activities/cancel_subtree_nodes.rs new file mode 100644 index 0000000..c8d3a41 --- /dev/null +++ b/src/activities/cancel_subtree_nodes.rs @@ -0,0 +1,60 @@ +//! CancelSubtreeNodes activity - marks all non-terminal nodes in a list as 'cancelled' +//! +//! Used after a RACE winner is determined to clean up the losing branch's node records. + +use duroxide::ActivityContext; +use sqlx::PgPool; +use std::sync::Arc; + +/// Activity name for registration and scheduling +pub const NAME: &str = "pg_durable::activity::cancel-subtree-nodes"; + +/// Mark all non-terminal nodes in `node_ids` as 'cancelled'. +/// +/// Nodes that are already in a terminal state (`completed`, `failed`, `cancelled`) +/// are left untouched; only `pending` and `running` nodes are updated. +pub async fn execute( + ctx: ActivityContext, + pool: Arc, + input_json: String, +) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse cancel-subtree-nodes input: {e}"))?; + + let node_ids: Vec = input["node_ids"] + .as_array() + .ok_or("Missing node_ids array")? + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + + if node_ids.is_empty() { + return Ok("No nodes to cancel".to_string()); + } + + ctx.trace_info(format!("Cancelling {} losing-branch nodes", node_ids.len())); + + // Bulk-update only nodes that are still in a non-terminal state so we never + // overwrite a 'completed', 'failed', or already-'cancelled' node — any of these + // are terminal and must not be disturbed. + let result = sqlx::query( + "UPDATE df.nodes + SET status = 'cancelled', updated_at = now() + WHERE id = ANY($1) AND status NOT IN ('completed', 'failed', 'cancelled')", + ) + .bind(&node_ids[..]) + .execute(pool.as_ref()) + .await; + + match result { + Ok(r) => { + ctx.trace_info(format!("Cancelled {} node(s)", r.rows_affected())); + Ok(format!("Cancelled {} node(s)", r.rows_affected())) + } + Err(e) => { + let err_msg = format!("Failed to cancel subtree nodes: {e}"); + ctx.trace_info(&err_msg); + Err(err_msg) + } + } +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs index ae1a7db..41d1a70 100644 --- a/src/activities/mod.rs +++ b/src/activities/mod.rs @@ -3,6 +3,7 @@ //! Each activity is in its own file with a co-located NAME constant. //! This enables IDE navigation (F12 jumps to implementation). +pub mod cancel_subtree_nodes; pub mod execute_http; pub mod execute_sql; pub mod load_function_graph; diff --git a/src/lib.rs b/src/lib.rs index be84c0d..d4a8b97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -240,7 +240,7 @@ ALTER TABLE df.nodes ADD CONSTRAINT nodes_result_name_chk CHECK (result_name IS NULL OR result_name ~ '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID, ADD CONSTRAINT nodes_status_chk - CHECK (status IN ('pending', 'running', 'completed', 'failed')) NOT VALID, + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID, ADD CONSTRAINT nodes_result_status_chk CHECK (result IS NULL OR status IN ('completed', 'failed')) NOT VALID, ADD CONSTRAINT nodes_structure_chk diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index 67183d6..f5d190d 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -806,6 +806,57 @@ async fn execute_join_node( Ok(result) } +/// Collect all node IDs in the subtree rooted at `root_id` by depth-first traversal. +/// +/// Handles the full node structure: +/// - `left_node` / `right_node` for THEN, IF, JOIN, RACE, LOOP +/// - `condition_node` embedded in `query` JSON for IF and LOOP nodes +/// - `extra_nodes` array embedded in `query` JSON for JOIN nodes (join3, etc.) +fn collect_subtree_node_ids(graph: &FunctionGraph, root_id: &str) -> Vec { + let mut ids: Vec = Vec::new(); + let mut stack = vec![root_id.to_string()]; + + while let Some(node_id) = stack.pop() { + // Guard against visiting the same node twice (not expected in valid graphs, + // but prevents any potential infinite loop). + if ids.contains(&node_id) { + continue; + } + let Some(node) = graph.nodes.get(&node_id) else { + continue; + }; + ids.push(node_id.clone()); + + // Follow structural children + if let Some(left) = &node.left_node { + stack.push(left.clone()); + } + if let Some(right) = &node.right_node { + stack.push(right.clone()); + } + + // Follow children embedded in the query config JSON + if let Some(config_str) = &node.query { + if let Ok(config) = serde_json::from_str::(config_str) { + // IF and LOOP: condition_node is a plain node ID string + if let Some(cond_id) = config["condition_node"].as_str() { + stack.push(cond_id.to_string()); + } + // JOIN (join3, etc.): extra_nodes is an array of node ID strings + if let Some(extras) = config["extra_nodes"].as_array() { + for extra in extras { + if let Some(id) = extra.as_str() { + stack.push(id.to_string()); + } + } + } + } + } + } + + ids +} + async fn execute_race_node( ctx: &OrchestrationContext, graph: &FunctionGraph, @@ -856,18 +907,48 @@ async fn execute_race_node( // Use ctx.select2() - first to complete wins // select2 now returns Either2 instead of (winner_idx, DurableOutput) - let raw = match ctx.select2(left_fut, right_fut).await { + let (raw, loser_root_id) = match ctx.select2(left_fut, right_fut).await { duroxide::Either2::First(Ok(r)) => { ctx.trace_info("RACE completed - left branch won"); - Ok(r) + (Ok(r), right_id.clone()) } - duroxide::Either2::First(Err(e)) => Err(format!("RACE left branch failed: {e}")), + duroxide::Either2::First(Err(e)) => (Err(format!("RACE left branch failed: {e}")), right_id.clone()), duroxide::Either2::Second(Ok(r)) => { ctx.trace_info("RACE completed - right branch won"); - Ok(r) + (Ok(r), left_id.clone()) } - duroxide::Either2::Second(Err(e)) => Err(format!("RACE right branch failed: {e}")), - }?; + duroxide::Either2::Second(Err(e)) => (Err(format!("RACE right branch failed: {e}")), left_id.clone()), + }; + + // Cancel all non-terminal nodes in the losing branch so that df.instance_nodes + // does not show ghost running/pending work after the race has been decided. + let loser_node_ids = collect_subtree_node_ids(graph, &loser_root_id); + if !loser_node_ids.is_empty() { + ctx.trace_info(format!( + "Cancelling {} losing-branch node(s) (root: {})", + loser_node_ids.len(), + loser_root_id + )); + let cancel_input = serde_json::json!({ "node_ids": loser_node_ids }); + // Best-effort: a failure here does not affect the race result but will + // leave losing-branch nodes in a non-terminal state. Log so operators + // can observe the problem without failing the workflow. + match ctx + .schedule_activity( + activities::cancel_subtree_nodes::NAME, + cancel_input.to_string(), + ) + .await + { + Ok(_) => {} + Err(e) => ctx.trace_info(format!( + "Warning: failed to cancel losing-branch nodes (root: {}): {e}", + loser_root_id + )), + } + } + + let raw = raw?; // Parse the subtree output envelope produced by execute_subtree and merge any named // results from the winning branch into the parent results map. diff --git a/src/registry.rs b/src/registry.rs index bb0a57e..7ddccc3 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -15,6 +15,7 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let graph_pool = pool.clone(); let status_pool = pool.clone(); let node_status_pool = pool.clone(); + let cancel_subtree_pool = pool.clone(); let http_pool = pool.clone(); ActivityRegistry::builder() @@ -34,6 +35,10 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let pool = node_status_pool.clone(); async move { activities::update_node_status::execute(ctx, pool, input_json).await } }) + .register(activities::cancel_subtree_nodes::NAME, move |ctx: ActivityContext, input_json: String| { + let pool = cancel_subtree_pool.clone(); + async move { activities::cancel_subtree_nodes::execute(ctx, pool, input_json).await } + }) .register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| { let pool = http_pool.clone(); async move { activities::execute_http::execute(ctx, pool, config_json).await } diff --git a/tests/e2e/sql/24_race_loser_cancelled.sql b/tests/e2e/sql/24_race_loser_cancelled.sql new file mode 100644 index 0000000..34e9cb4 --- /dev/null +++ b/tests/e2e/sql/24_race_loser_cancelled.sql @@ -0,0 +1,130 @@ +-- Tests: df.instance_nodes marks losing-branch nodes as 'cancelled' after race completes +-- +-- Repro for: df.instance_nodes leaves race-loser nodes running or pending after +-- race completion. +-- +-- Setup: a race where one branch completes instantly and the other waits with a +-- long sleep. After the race completes, every losing-branch node must have +-- status = 'cancelled' in df.nodes; none should remain 'running' or 'pending'. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Scenario 1: fast SQL wins, long sleep loses === + +CREATE TEMP TABLE _race_loser_cancelled_state (instance_id TEXT); + +INSERT INTO _race_loser_cancelled_state +SELECT df.start( + df.race( + 'SELECT ''fast'' AS winner', + df.sleep(60) + ), + 'test-race-loser-cancelled' +); + +DO $$ +DECLARE + inst_id TEXT; + v_status TEXT; + ghost_count INT; + attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _race_loser_cancelled_state; + RAISE NOTICE 'Testing race loser cancellation for instance: %', inst_id; + + SELECT df.wait_for_completion(inst_id, 30) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: expected completed, got %', v_status; + END IF; + + -- Poll until all nodes reach a terminal state (or time out after ~10 s). + -- The cancel activity is scheduled asynchronously so there may be a short + -- lag between the instance completing and the losing-branch nodes being + -- written to 'cancelled'. + LOOP + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); + + EXIT WHEN ghost_count = 0 OR attempts >= 50; + PERFORM pg_sleep(0.2); + attempts := attempts + 1; + END LOOP; + + IF ghost_count > 0 THEN + RAISE EXCEPTION + 'TEST FAILED [race-loser-cancelled]: % node(s) still running/pending after race completed', + ghost_count; + END IF; + + -- Verify the losing branch root node (the sleep) is specifically 'cancelled' + IF NOT EXISTS ( + SELECT 1 + FROM df.instance_nodes(inst_id) + WHERE node_type = 'SLEEP' AND status = 'cancelled' + ) THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: SLEEP node not marked as cancelled'; + END IF; + + RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 1)'; +END $$; + +DROP TABLE _race_loser_cancelled_state; + +-- === Scenario 2: losing branch is a multi-node sequence (THEN + SQL) === + +CREATE TEMP TABLE _race_loser_seq_state (instance_id TEXT); + +INSERT INTO _race_loser_seq_state +SELECT df.start( + df.race( + 'SELECT ''fast'' AS winner', + df.seq( + df.sleep(60), + 'SELECT ''slow-follow-up''' + ) + ), + 'test-race-loser-seq' +); + +DO $$ +DECLARE + inst_id TEXT; + v_status TEXT; + ghost_count INT; + attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _race_loser_seq_state; + RAISE NOTICE 'Testing race loser cancellation (multi-node) for instance: %', inst_id; + + SELECT df.wait_for_completion(inst_id, 30) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-seq]: expected completed, got %', v_status; + END IF; + + -- Poll until all nodes reach a terminal state (or time out after ~10 s). + LOOP + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); + + EXIT WHEN ghost_count = 0 OR attempts >= 50; + PERFORM pg_sleep(0.2); + attempts := attempts + 1; + END LOOP; + + IF ghost_count > 0 THEN + RAISE EXCEPTION + 'TEST FAILED [race-loser-seq]: % node(s) still running/pending after race completed', + ghost_count; + END IF; + + RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 2: multi-node loser)'; +END $$; + +DROP TABLE _race_loser_seq_state; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED: race loser nodes cancelled' AS result;