From 0966a8284279ece9c689fe7aa7dd8ba4f7e216ab Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 12:39:47 +0000 Subject: [PATCH 1/2] Initial plan From 02e1df3ba44f77e1e2db35cb5f5ce7fa1cddadeb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 23 May 2026 12:48:36 +0000 Subject: [PATCH 2/2] Add retry, timeout, catch, parallel, and fractional sleep DSL primitives Agent-Logs-Url: https://github.com/microsoft/pg_durable/sessions/431bfefe-5a7f-4e93-b68d-29151ec2fa50 Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- USER_GUIDE.md | 8 +- sql/pg_durable--0.1.1--0.2.0.sql | 7 + src/dsl.rs | 216 ++++++++++++++++- src/lib.rs | 7 + src/orchestrations/execute_function_graph.rs | 241 ++++++++++++++++++- src/types.rs | 33 +++ 6 files changed, 501 insertions(+), 11 deletions(-) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 6d2021f..d48aa90 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -170,12 +170,18 @@ df.sql('SELECT 1') ~> df.sql('SELECT 2') | Function | Description | Example | |----------|-------------|---------| -| `df.sleep(seconds)` | Pause for N seconds | `df.sleep(60)` | +| `df.sleep(seconds)` | Pause for N seconds (supports fractional seconds) | `df.sleep(0.2)` | | `df.wait_for_schedule(cron)` | Wait until cron matches | `df.wait_for_schedule('0 * * * *')` | | `df.http(url, method, body, headers, timeout)` | Make HTTP request | `df.http('https://api.example.com', 'POST', '{"key": "value"}')` | | `df.join(a, b)` | Execute in parallel, wait for all | `df.join('SELECT 1', 'SELECT 2')` | | `df.join3(a, b, c)` | Three in parallel | `df.join3(a, b, c)` | +| `df.parallel(futures[])` | Set-style parallel join | `df.parallel(ARRAY['SELECT 1','SELECT 2','SELECT 3'])` | +| `df.join_all(futures[])` | Alias for `df.parallel` | `df.join_all(ARRAY[a,b,c])` | | `df.race(a, b)` | Execute in parallel, first wins | `df.race(fast_query, slow_query)` | +| `df.timeout(fut, seconds)` | Fail if a future exceeds a deadline | `df.timeout(df.sql('SELECT pg_sleep(10)'), 0.5)` | +| `df.retry(fut, ...)` | Retry with declarative backoff policy | `df.retry(df.sql('SELECT flaky()'), 'on_429', 10, 200, 30000, 2.0, 0.2)` | +| `df.catch(fut, handler)` | Run fallback branch on failure | `df.catch(df.sql('SELECT risky()'), df.sql('SELECT recover()'))` | +| `df.on_error(fut, handler)` | Alias for `df.catch` | `df.on_error(fut, handler)` | | `df.if(cond, then, else)` | Conditional branch | `df.if('SELECT true', a, b)` | | `df.loop(body)` | Repeat forever | `df.loop(body)` | | `df.loop(body, cond)` | Repeat while condition is true | `df.loop(body, 'SELECT count(*) > 0 FROM q')` | diff --git a/sql/pg_durable--0.1.1--0.2.0.sql b/sql/pg_durable--0.1.1--0.2.0.sql index 777de15..4985408 100644 --- a/sql/pg_durable--0.1.1--0.2.0.sql +++ b/sql/pg_durable--0.1.1--0.2.0.sql @@ -270,6 +270,7 @@ DECLARE 'df.seq(text, text)', 'df.as(text, text)', 'df.sleep(bigint)', + 'df.sleep(double precision)', 'df.wait_for_schedule(text)', 'df.loop(text, text)', 'df.break(text)', @@ -277,7 +278,13 @@ DECLARE 'df.if_rows(text, text, text)', 'df.join(text, text)', 'df.join3(text, text, text)', + 'df.parallel(text[])', + 'df.join_all(text[])', 'df.race(text, text)', + 'df.timeout(text, double precision)', + 'df.retry(text, text, integer, bigint, bigint, double precision, double precision, text)', + 'df.catch(text, text)', + 'df.on_error(text, text)', 'df.wait_for_signal(text, integer)', 'df.signal(text, text, text)', 'df.start(text, text, text)', diff --git a/src/dsl.rs b/src/dsl.rs index e617988..482578c 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -245,7 +245,27 @@ pub fn sleep(seconds: i64) -> String { } Durofut { node_type: "SLEEP".to_string(), - query: Some(seconds.to_string()), + query: Some(serde_json::json!({ "milliseconds": seconds * 1000 }).to_string()), + ..Default::default() + } + .to_json() +} + +/// Creates a sleep node that pauses for fractional seconds. +#[pg_extern(name = "sleep", schema = "df")] +pub fn sleep_fractional(seconds: f64) -> String { + if !seconds.is_finite() || seconds < 0.0 { + pgrx::error!("Sleep duration must be a finite, non-negative number"); + } + + let millis = (seconds * 1000.0).round(); + if millis > i64::MAX as f64 { + pgrx::error!("Sleep duration is too large"); + } + + Durofut { + node_type: "SLEEP".to_string(), + query: Some(serde_json::json!({ "milliseconds": millis as i64 }).to_string()), ..Default::default() } .to_json() @@ -439,6 +459,42 @@ pub fn join3(a: &str, b: &str, c: &str) -> String { .to_json() } +/// Creates a set-style parallel join node from an array of futures. +#[pg_extern(schema = "df")] +pub fn parallel(futures: Vec<&str>) -> String { + if futures.len() < 2 { + pgrx::error!("df.parallel requires at least 2 futures"); + } + + let parsed: Vec = futures.into_iter().map(Durofut::ensure).collect(); + + let mut iter = parsed.into_iter(); + let left = iter.next().expect("validated length >= 2"); + let right = iter.next().expect("validated length >= 2"); + let extras: Vec = iter.collect(); + + let query = if extras.is_empty() { + None + } else { + Some(serde_json::json!({ "extra_nodes": extras }).to_string()) + }; + + Durofut { + node_type: "JOIN".to_string(), + left_node: Some(Box::new(left)), + right_node: Some(Box::new(right)), + query, + ..Default::default() + } + .to_json() +} + +/// Alias for df.parallel(). +#[pg_extern(name = "join_all", schema = "df")] +pub fn join_all(futures: Vec<&str>) -> String { + parallel(futures) +} + /// Creates a race node - runs branches in parallel, first to complete wins. /// Arguments can be either Durofut JSON or plain SQL strings (auto-wrapped). #[pg_extern(schema = "df")] @@ -455,6 +511,113 @@ pub fn race(a: &str, b: &str) -> String { .to_json() } +/// Creates a timeout wrapper around a future. +/// If the wrapped future does not complete within the given duration, execution fails. +#[pg_extern(schema = "df")] +pub fn timeout(fut: &str, timeout_seconds: f64) -> String { + if !timeout_seconds.is_finite() || timeout_seconds <= 0.0 { + pgrx::error!("Timeout must be a finite, positive number"); + } + + let timeout_fut = sleep_fractional(timeout_seconds); + let mut race_fut = Durofut::ensure(&race(fut, &timeout_fut)); + race_fut.query = Some( + serde_json::json!({ + "timeout_wrapper": true, + "timeout_ms": (timeout_seconds * 1000.0).round() as i64 + }) + .to_string(), + ); + race_fut.to_json() +} + +fn build_retry_node( + fut: &str, + policy: &str, + max_attempts: i32, + initial_backoff_ms: i64, + max_backoff_ms: i64, + backoff_multiplier: f64, + jitter: f64, + on_error: Option<&str>, +) -> String { + if max_attempts <= 0 { + pgrx::error!("max_attempts must be positive"); + } + if initial_backoff_ms < 0 || max_backoff_ms < 0 { + pgrx::error!("Backoff durations must be non-negative"); + } + if max_backoff_ms < initial_backoff_ms { + pgrx::error!("max_backoff_ms must be >= initial_backoff_ms"); + } + if !backoff_multiplier.is_finite() || backoff_multiplier < 1.0 { + pgrx::error!("backoff_multiplier must be finite and >= 1.0"); + } + if !jitter.is_finite() || !(0.0..=1.0).contains(&jitter) { + pgrx::error!("jitter must be between 0.0 and 1.0"); + } + + let mut config = serde_json::json!({ + "retry": { + "policy": policy, + "max_attempts": max_attempts, + "initial_backoff_ms": initial_backoff_ms, + "max_backoff_ms": max_backoff_ms, + "backoff_multiplier": backoff_multiplier, + "jitter": jitter + } + }); + + if let Some(handler) = on_error { + config["on_error_node"] = serde_json::to_value(Durofut::ensure(handler)) + .unwrap_or_else(|e| pgrx::error!("Failed to serialize on_error handler: {}", e)); + } + + Durofut { + node_type: "LOOP".to_string(), + left_node: Some(Box::new(Durofut::ensure(fut))), + query: Some(config.to_string()), + ..Default::default() + } + .to_json() +} + +/// Retries a future with declarative backoff policy. +#[pg_extern(schema = "df")] +pub fn retry( + fut: &str, + policy: default!(&str, "'transient'"), + max_attempts: default!(i32, "3"), + initial_backoff_ms: default!(i64, "1000"), + max_backoff_ms: default!(i64, "30000"), + backoff_multiplier: default!(f64, "2.0"), + jitter: default!(f64, "0.0"), + on_error: default!(Option<&str>, "NULL"), +) -> String { + build_retry_node( + fut, + policy, + max_attempts, + initial_backoff_ms, + max_backoff_ms, + backoff_multiplier, + jitter, + on_error, + ) +} + +/// Executes a fallback branch when the primary future fails. +#[pg_extern(name = "catch", schema = "df")] +pub fn catch_fn(fut: &str, on_error: &str) -> String { + build_retry_node(fut, "all", 1, 0, 0, 1.0, 0.0, Some(on_error)) +} + +/// Alias for df.catch(). +#[pg_extern(schema = "df")] +pub fn on_error(fut: &str, handler: &str) -> String { + catch_fn(fut, handler) +} + /// Creates an HTTP request node. /// Makes an HTTP request to the specified URL and returns the response. /// @@ -1048,7 +1211,7 @@ pub fn wait_for_completion( #[cfg(test)] mod tests { - use super::parse_semver; + use super::*; #[test] fn test_parse_semver_basic() { @@ -1080,4 +1243,53 @@ mod tests { assert!(parse_semver("0.3.0").unwrap() >= (0, 2, 0)); assert!(parse_semver("1.0.0").unwrap() >= (0, 2, 0)); } + + #[test] + fn test_sleep_fractional_stores_milliseconds() { + let fut = Durofut::from_json(&sleep_fractional(0.125)); + assert_eq!(fut.node_type, "SLEEP"); + let cfg: serde_json::Value = + serde_json::from_str(fut.query.as_deref().expect("sleep config")).unwrap(); + assert_eq!(cfg["milliseconds"], 125); + } + + #[test] + fn test_retry_embeds_policy_and_on_error() { + let retry_json = retry( + "SELECT 1", + "on_429", + 5, + 200, + 30_000, + 2.0, + 0.2, + Some("SELECT 'fallback'"), + ); + let fut = Durofut::from_json(&retry_json); + assert_eq!(fut.node_type, "LOOP"); + let cfg: serde_json::Value = + serde_json::from_str(fut.query.as_deref().expect("retry config")).unwrap(); + assert_eq!(cfg["retry"]["policy"], "on_429"); + assert_eq!(cfg["retry"]["max_attempts"], 5); + assert_eq!(cfg["on_error_node"]["node_type"], "SQL"); + } + + #[test] + fn test_timeout_marks_race_as_timeout_wrapper() { + let fut = Durofut::from_json(&timeout("SELECT 1", 0.25)); + assert_eq!(fut.node_type, "RACE"); + let cfg: serde_json::Value = + serde_json::from_str(fut.query.as_deref().expect("timeout config")).unwrap(); + assert_eq!(cfg["timeout_wrapper"], true); + assert_eq!(cfg["timeout_ms"], 250); + } + + #[test] + fn test_parallel_builds_join_with_extras() { + let fut = Durofut::from_json(¶llel(vec!["SELECT 1", "SELECT 2", "SELECT 3"])); + assert_eq!(fut.node_type, "JOIN"); + let cfg: serde_json::Value = + serde_json::from_str(fut.query.as_deref().expect("parallel config")).unwrap(); + assert_eq!(cfg["extra_nodes"].as_array().map(|v| v.len()), Some(1)); + } } diff --git a/src/lib.rs b/src/lib.rs index 85b2e28..f39f20e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -370,6 +370,7 @@ DECLARE 'df.seq(text, text)', 'df.as(text, text)', 'df.sleep(bigint)', + 'df.sleep(double precision)', 'df.wait_for_schedule(text)', 'df.loop(text, text)', 'df.break(text)', @@ -377,7 +378,13 @@ DECLARE 'df.if_rows(text, text, text)', 'df.join(text, text)', 'df.join3(text, text, text)', + 'df.parallel(text[])', + 'df.join_all(text[])', 'df.race(text, text)', + 'df.timeout(text, double precision)', + 'df.retry(text, text, integer, bigint, bigint, double precision, double precision, text)', + 'df.catch(text, text)', + 'df.on_error(text, text)', 'df.wait_for_signal(text, integer)', 'df.signal(text, text, text)', 'df.start(text, text, text)', diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index e295c35..3627271 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -372,19 +372,194 @@ async fn execute_sleep_node( node: &FunctionNode, node_id: &str, ) -> Result { - let seconds_str = node + let duration_str = node .query .as_ref() .ok_or_else(|| format!("SLEEP node {node_id} has no duration"))?; - let seconds: u64 = seconds_str - .parse() - .map_err(|_| format!("Invalid sleep duration: {seconds_str}"))?; + // Backward compatibility: + // - v0.2.1 and earlier stored plain integer seconds in query text. + // - Newer versions store {"milliseconds": } JSON for sub-second sleeps. + let millis: u64 = match serde_json::from_str::(duration_str) { + Ok(v) => v + .get("milliseconds") + .and_then(|m| m.as_u64()) + .ok_or_else(|| format!("Invalid sleep config for node {node_id}: {duration_str}"))?, + Err(_) => { + let seconds: u64 = duration_str + .parse() + .map_err(|_| format!("Invalid sleep duration: {duration_str}"))?; + seconds.saturating_mul(1000) + } + }; + + ctx.trace_info(format!("Sleeping for {millis} ms")); + ctx.schedule_timer(Duration::from_millis(millis)).await; + + Ok(format!(r#"{{"slept": true, "milliseconds": {millis}}}"#)) +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct RetryConfig { + #[serde(default = "default_retry_policy")] + policy: String, + #[serde(default = "default_retry_attempts")] + max_attempts: u32, + #[serde(default)] + initial_backoff_ms: u64, + #[serde(default)] + max_backoff_ms: u64, + #[serde(default = "default_retry_multiplier")] + backoff_multiplier: f64, + #[serde(default)] + jitter: f64, +} + +fn default_retry_policy() -> String { + "transient".to_string() +} + +fn default_retry_attempts() -> u32 { + 3 +} + +fn default_retry_multiplier() -> f64 { + 2.0 +} + +fn parse_retry_config(config: &serde_json::Value) -> Result, String> { + let Some(retry_raw) = config.get("retry") else { + return Ok(None); + }; - ctx.trace_info(format!("Sleeping for {seconds} seconds")); - ctx.schedule_timer(Duration::from_secs(seconds)).await; + let retry_cfg: RetryConfig = serde_json::from_value(retry_raw.clone()) + .map_err(|e| format!("Invalid retry config: {e}"))?; + + if retry_cfg.max_attempts == 0 { + return Err("retry.max_attempts must be positive".to_string()); + } + if retry_cfg.max_backoff_ms < retry_cfg.initial_backoff_ms { + return Err("retry.max_backoff_ms must be >= retry.initial_backoff_ms".to_string()); + } + if !retry_cfg.backoff_multiplier.is_finite() || retry_cfg.backoff_multiplier < 1.0 { + return Err("retry.backoff_multiplier must be finite and >= 1.0".to_string()); + } + if !retry_cfg.jitter.is_finite() || !(0.0..=1.0).contains(&retry_cfg.jitter) { + return Err("retry.jitter must be between 0.0 and 1.0".to_string()); + } - Ok(format!(r#"{{"slept": true, "seconds": {seconds}}}"#)) + Ok(Some(retry_cfg)) +} + +fn deterministic_jitter_multiplier(node_id: &str, attempt: u32, jitter: f64) -> f64 { + if jitter <= 0.0 { + return 1.0; + } + + // Deterministic FNV-1a hash (replay-safe, no runtime randomness). + let mut hash: u64 = 1469598103934665603; + for b in node_id.as_bytes() { + hash ^= *b as u64; + hash = hash.wrapping_mul(1099511628211); + } + hash ^= attempt as u64; + hash = hash.wrapping_mul(1099511628211); + + let unit = (hash % 10_000) as f64 / 10_000.0; // [0, 1) + 1.0 - jitter + (2.0 * jitter * unit) // [1-jitter, 1+jitter) +} + +fn compute_backoff_ms(cfg: &RetryConfig, node_id: &str, attempt: u32) -> u64 { + if cfg.initial_backoff_ms == 0 { + return 0; + } + + let pow = cfg + .backoff_multiplier + .powi((attempt.saturating_sub(1)) as i32); + let base = (cfg.initial_backoff_ms as f64) * pow; + let with_jitter = base * deterministic_jitter_multiplier(node_id, attempt, cfg.jitter); + let capped = with_jitter.min(cfg.max_backoff_ms as f64).max(0.0); + capped.round() as u64 +} + +fn is_retryable_error(policy: &str, err: &str) -> bool { + let msg = err.to_ascii_lowercase(); + let has = |needle: &str| msg.contains(needle); + + match policy.to_ascii_lowercase().as_str() { + "all" => true, + "on_429" | "rate_limited" => has("429") || has("rate limit") || has("too many requests"), + "transient" => { + has("429") + || has("rate limit") + || has("too many requests") + || has("timeout") + || has("temporar") + || has("deadlock") + || has("could not serialize") + || has("connection reset") + || has("connection refused") + || has("lock timeout") + } + _ => false, + } +} + +async fn execute_retry_loop_node( + ctx: &OrchestrationContext, + graph: &FunctionGraph, + node_id: &str, + body_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, + config: &serde_json::Value, + retry_cfg: RetryConfig, +) -> Result { + let on_error_node_id = config.get("on_error_node").and_then(|v| v.as_str()); + + let mut attempt: u32 = 1; + loop { + match Box::pin(execute_function_node_with_vars( + ctx, graph, body_id, results, exec_ctx, + )) + .await + { + Ok(result) => return Ok(result), + Err(err) => { + let retryable = is_retryable_error(&retry_cfg.policy, &err); + let exhausted = attempt >= retry_cfg.max_attempts; + ctx.trace_info(format!( + "Retry node {node_id} attempt {attempt} failed (retryable={retryable}, exhausted={exhausted}): {err}" + )); + + if !retryable || exhausted { + if let Some(handler_id) = on_error_node_id { + let err_payload = serde_json::json!({ + "message": err, + "attempt": attempt, + "retryable": retryable + }); + results.insert(RETRY_ERROR_KEY.to_string(), err_payload.to_string()); + return Box::pin(execute_function_node_with_vars( + ctx, graph, handler_id, results, exec_ctx, + )) + .await; + } + return Err(err); + } + + let delay_ms = compute_backoff_ms(&retry_cfg, node_id, attempt); + if delay_ms > 0 { + ctx.trace_info(format!( + "Retry node {node_id} scheduling backoff: {delay_ms}ms before next attempt" + )); + ctx.schedule_timer(Duration::from_millis(delay_ms)).await; + } + attempt = attempt.saturating_add(1); + } + } + } } async fn execute_wait_schedule_node( @@ -417,6 +592,7 @@ async fn execute_wait_schedule_node( /// Sentinel key used to signal a break from within a loop const BREAK_SENTINEL: &str = "__break__"; +const RETRY_ERROR_KEY: &str = "__error__"; /// Minimum wall-clock duration that every loop iteration must take before /// `continue_as_new` is called. If the body (plus any while-condition @@ -457,6 +633,17 @@ async fn execute_loop_node( .as_ref() .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + if let Some(ref config_str) = node.query { + if let Ok(config) = serde_json::from_str::(config_str) { + if let Some(retry_cfg) = parse_retry_config(&config)? { + return execute_retry_loop_node( + ctx, graph, node_id, body_id, results, exec_ctx, &config, retry_cfg, + ) + .await; + } + } + } + // Capture the iteration start time so we can rate-limit `continue_as_new` // below. `utc_now()` is duroxide's deterministic clock (recorded in // history and replayed verbatim), so this remains replay-safe. @@ -829,7 +1016,45 @@ async fn execute_race_node( }) .to_string(); - // Schedule sub-orchestrations + // timeout wrapper: race the target branch against a durable timer + if let Some(config_str) = &node.query { + if let Ok(config) = serde_json::from_str::(config_str) { + if config + .get("timeout_wrapper") + .and_then(|v| v.as_bool()) + .unwrap_or(false) + { + let timeout_ms = config + .get("timeout_ms") + .and_then(|v| v.as_u64()) + .ok_or_else(|| "RACE timeout wrapper missing timeout_ms".to_string())?; + + let left_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, left_input); + let timeout_fut = ctx.schedule_timer(Duration::from_millis(timeout_ms)); + + let raw = match ctx.select2(left_fut, timeout_fut).await { + duroxide::Either2::First(Ok(r)) => { + ctx.trace_info("TIMEOUT wrapper completed before deadline"); + r + } + duroxide::Either2::First(Err(e)) => { + return Err(format!("TIMEOUT wrapped branch failed: {e}")); + } + duroxide::Either2::Second(()) => { + return Err(format!("Operation timed out after {}ms", timeout_ms)); + } + }; + + let result = parse_subtree_envelope(&raw, "TIMEOUT branch", results)?; + if let Some(name) = &node.result_name { + results.insert(name.clone(), result.clone()); + } + return Ok(result); + } + } + } + + // Standard race behavior: schedule both sub-orchestrations let left_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, left_input); let right_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, right_input); diff --git a/src/types.rs b/src/types.rs index 43a762d..7d6695f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -953,6 +953,22 @@ impl Durofut { } } + // LOOP nodes may also embed retry fallback handlers + if self.node_type == "LOOP" { + if let Some(handler) = config.get("on_error_node") { + let handler_node = + serde_json::from_value::(handler.clone()).map_err(|e| { + format!( + "on_error_node in {} must be a valid Durofut object, got {}: {}", + self.node_type, + summarize_json_type(handler), + e + ) + })?; + f(&handler_node)?; + } + } + // JOIN nodes: extra_nodes array if self.node_type == "JOIN" { if let Some(extras) = config.get("extra_nodes").and_then(|e| e.as_array()) { @@ -1007,6 +1023,23 @@ impl Durofut { } } + // LOOP nodes may also embed retry fallback handlers + if self.node_type == "LOOP" { + if let Some(handler) = config.get("on_error_node") { + let handler_node = + serde_json::from_value::(handler.clone()).map_err(|e| { + format!( + "on_error_node in {} must be a valid Durofut object, got {}: {}", + self.node_type, + summarize_json_type(handler), + e + ) + })?; + let handler_id = f(&handler_node)?; + config["on_error_node"] = serde_json::json!(handler_id); + } + } + // JOIN nodes: extra_nodes array if self.node_type == "JOIN" { if let Some(extras) = config.get("extra_nodes").and_then(|e| e.as_array()) {