Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,18 @@ df.sql('SELECT 1') ~> df.sql('SELECT 2')

| Function | Description | Example |
|----------|-------------|---------|
| `df.sleep(seconds)` | Pause for N seconds | `df.sleep(60)` |
| `df.sleep(seconds)` | Pause for N seconds (supports fractional seconds) | `df.sleep(0.2)` |
| `df.wait_for_schedule(cron)` | Wait until cron matches | `df.wait_for_schedule('0 * * * *')` |
| `df.http(url, method, body, headers, timeout)` | Make HTTP request | `df.http('https://api.example.com', 'POST', '{"key": "value"}')` |
| `df.join(a, b)` | Execute in parallel, wait for all | `df.join('SELECT 1', 'SELECT 2')` |
| `df.join3(a, b, c)` | Three in parallel | `df.join3(a, b, c)` |
| `df.parallel(futures[])` | Set-style parallel join | `df.parallel(ARRAY['SELECT 1','SELECT 2','SELECT 3'])` |
| `df.join_all(futures[])` | Alias for `df.parallel` | `df.join_all(ARRAY[a,b,c])` |
| `df.race(a, b)` | Execute in parallel, first wins | `df.race(fast_query, slow_query)` |
| `df.timeout(fut, seconds)` | Fail if a future exceeds a deadline | `df.timeout(df.sql('SELECT pg_sleep(10)'), 0.5)` |
| `df.retry(fut, ...)` | Retry with declarative backoff policy | `df.retry(df.sql('SELECT flaky()'), 'on_429', 10, 200, 30000, 2.0, 0.2)` |
| `df.catch(fut, handler)` | Run fallback branch on failure | `df.catch(df.sql('SELECT risky()'), df.sql('SELECT recover()'))` |
| `df.on_error(fut, handler)` | Alias for `df.catch` | `df.on_error(fut, handler)` |
| `df.if(cond, then, else)` | Conditional branch | `df.if('SELECT true', a, b)` |
| `df.loop(body)` | Repeat forever | `df.loop(body)` |
| `df.loop(body, cond)` | Repeat while condition is true | `df.loop(body, 'SELECT count(*) > 0 FROM q')` |
Expand Down
7 changes: 7 additions & 0 deletions sql/pg_durable--0.1.1--0.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,21 @@ DECLARE
'df.seq(text, text)',
'df.as(text, text)',
'df.sleep(bigint)',
'df.sleep(double precision)',
'df.wait_for_schedule(text)',
'df.loop(text, text)',
'df.break(text)',
'df.if(text, text, text)',
'df.if_rows(text, text, text)',
'df.join(text, text)',
'df.join3(text, text, text)',
'df.parallel(text[])',
'df.join_all(text[])',
'df.race(text, text)',
'df.timeout(text, double precision)',
'df.retry(text, text, integer, bigint, bigint, double precision, double precision, text)',
'df.catch(text, text)',
'df.on_error(text, text)',
'df.wait_for_signal(text, integer)',
'df.signal(text, text, text)',
'df.start(text, text, text)',
Expand Down
216 changes: 214 additions & 2 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,27 @@ pub fn sleep(seconds: i64) -> String {
}
Durofut {
node_type: "SLEEP".to_string(),
query: Some(seconds.to_string()),
query: Some(serde_json::json!({ "milliseconds": seconds * 1000 }).to_string()),
..Default::default()
}
.to_json()
}

/// Creates a sleep node that pauses for fractional seconds.
#[pg_extern(name = "sleep", schema = "df")]
pub fn sleep_fractional(seconds: f64) -> String {
if !seconds.is_finite() || seconds < 0.0 {
pgrx::error!("Sleep duration must be a finite, non-negative number");
}

let millis = (seconds * 1000.0).round();
if millis > i64::MAX as f64 {
pgrx::error!("Sleep duration is too large");
}

Durofut {
node_type: "SLEEP".to_string(),
query: Some(serde_json::json!({ "milliseconds": millis as i64 }).to_string()),
..Default::default()
}
.to_json()
Expand Down Expand Up @@ -439,6 +459,42 @@ pub fn join3(a: &str, b: &str, c: &str) -> String {
.to_json()
}

/// Creates a set-style parallel join node from an array of futures.
#[pg_extern(schema = "df")]
pub fn parallel(futures: Vec<&str>) -> String {
if futures.len() < 2 {
pgrx::error!("df.parallel requires at least 2 futures");
}

let parsed: Vec<Durofut> = futures.into_iter().map(Durofut::ensure).collect();

let mut iter = parsed.into_iter();
let left = iter.next().expect("validated length >= 2");
let right = iter.next().expect("validated length >= 2");
let extras: Vec<Durofut> = iter.collect();

let query = if extras.is_empty() {
None
} else {
Some(serde_json::json!({ "extra_nodes": extras }).to_string())
};

Durofut {
node_type: "JOIN".to_string(),
left_node: Some(Box::new(left)),
right_node: Some(Box::new(right)),
query,
..Default::default()
}
.to_json()
}

/// Alias for df.parallel().
#[pg_extern(name = "join_all", schema = "df")]
pub fn join_all(futures: Vec<&str>) -> String {
parallel(futures)
}

/// Creates a race node - runs branches in parallel, first to complete wins.
/// Arguments can be either Durofut JSON or plain SQL strings (auto-wrapped).
#[pg_extern(schema = "df")]
Expand All @@ -455,6 +511,113 @@ pub fn race(a: &str, b: &str) -> String {
.to_json()
}

/// Creates a timeout wrapper around a future.
/// If the wrapped future does not complete within the given duration, execution fails.
#[pg_extern(schema = "df")]
pub fn timeout(fut: &str, timeout_seconds: f64) -> String {
if !timeout_seconds.is_finite() || timeout_seconds <= 0.0 {
pgrx::error!("Timeout must be a finite, positive number");
}

let timeout_fut = sleep_fractional(timeout_seconds);
let mut race_fut = Durofut::ensure(&race(fut, &timeout_fut));
race_fut.query = Some(
serde_json::json!({
"timeout_wrapper": true,
"timeout_ms": (timeout_seconds * 1000.0).round() as i64
})
.to_string(),
);
race_fut.to_json()
}

fn build_retry_node(
fut: &str,
policy: &str,
max_attempts: i32,
initial_backoff_ms: i64,
max_backoff_ms: i64,
backoff_multiplier: f64,
jitter: f64,
on_error: Option<&str>,
) -> String {
if max_attempts <= 0 {
pgrx::error!("max_attempts must be positive");
}
if initial_backoff_ms < 0 || max_backoff_ms < 0 {
pgrx::error!("Backoff durations must be non-negative");
}
if max_backoff_ms < initial_backoff_ms {
pgrx::error!("max_backoff_ms must be >= initial_backoff_ms");
}
if !backoff_multiplier.is_finite() || backoff_multiplier < 1.0 {
pgrx::error!("backoff_multiplier must be finite and >= 1.0");
}
if !jitter.is_finite() || !(0.0..=1.0).contains(&jitter) {
pgrx::error!("jitter must be between 0.0 and 1.0");
}

let mut config = serde_json::json!({
"retry": {
"policy": policy,
"max_attempts": max_attempts,
"initial_backoff_ms": initial_backoff_ms,
"max_backoff_ms": max_backoff_ms,
"backoff_multiplier": backoff_multiplier,
"jitter": jitter
}
});

if let Some(handler) = on_error {
config["on_error_node"] = serde_json::to_value(Durofut::ensure(handler))
.unwrap_or_else(|e| pgrx::error!("Failed to serialize on_error handler: {}", e));
}

Durofut {
node_type: "LOOP".to_string(),
left_node: Some(Box::new(Durofut::ensure(fut))),
query: Some(config.to_string()),
..Default::default()
}
.to_json()
}

/// Retries a future with declarative backoff policy.
#[pg_extern(schema = "df")]
pub fn retry(
fut: &str,
policy: default!(&str, "'transient'"),
max_attempts: default!(i32, "3"),
initial_backoff_ms: default!(i64, "1000"),
max_backoff_ms: default!(i64, "30000"),
backoff_multiplier: default!(f64, "2.0"),
jitter: default!(f64, "0.0"),
on_error: default!(Option<&str>, "NULL"),
) -> String {
build_retry_node(
fut,
policy,
max_attempts,
initial_backoff_ms,
max_backoff_ms,
backoff_multiplier,
jitter,
on_error,
)
}

/// Executes a fallback branch when the primary future fails.
#[pg_extern(name = "catch", schema = "df")]
pub fn catch_fn(fut: &str, on_error: &str) -> String {
build_retry_node(fut, "all", 1, 0, 0, 1.0, 0.0, Some(on_error))
}

/// Alias for df.catch().
#[pg_extern(schema = "df")]
pub fn on_error(fut: &str, handler: &str) -> String {
catch_fn(fut, handler)
}

/// Creates an HTTP request node.
/// Makes an HTTP request to the specified URL and returns the response.
///
Expand Down Expand Up @@ -1048,7 +1211,7 @@ pub fn wait_for_completion(

#[cfg(test)]
mod tests {
use super::parse_semver;
use super::*;

#[test]
fn test_parse_semver_basic() {
Expand Down Expand Up @@ -1080,4 +1243,53 @@ mod tests {
assert!(parse_semver("0.3.0").unwrap() >= (0, 2, 0));
assert!(parse_semver("1.0.0").unwrap() >= (0, 2, 0));
}

#[test]
fn test_sleep_fractional_stores_milliseconds() {
let fut = Durofut::from_json(&sleep_fractional(0.125));
assert_eq!(fut.node_type, "SLEEP");
let cfg: serde_json::Value =
serde_json::from_str(fut.query.as_deref().expect("sleep config")).unwrap();
assert_eq!(cfg["milliseconds"], 125);
}

#[test]
fn test_retry_embeds_policy_and_on_error() {
let retry_json = retry(
"SELECT 1",
"on_429",
5,
200,
30_000,
2.0,
0.2,
Some("SELECT 'fallback'"),
);
let fut = Durofut::from_json(&retry_json);
assert_eq!(fut.node_type, "LOOP");
let cfg: serde_json::Value =
serde_json::from_str(fut.query.as_deref().expect("retry config")).unwrap();
assert_eq!(cfg["retry"]["policy"], "on_429");
assert_eq!(cfg["retry"]["max_attempts"], 5);
assert_eq!(cfg["on_error_node"]["node_type"], "SQL");
}

#[test]
fn test_timeout_marks_race_as_timeout_wrapper() {
let fut = Durofut::from_json(&timeout("SELECT 1", 0.25));
assert_eq!(fut.node_type, "RACE");
let cfg: serde_json::Value =
serde_json::from_str(fut.query.as_deref().expect("timeout config")).unwrap();
assert_eq!(cfg["timeout_wrapper"], true);
assert_eq!(cfg["timeout_ms"], 250);
}

#[test]
fn test_parallel_builds_join_with_extras() {
let fut = Durofut::from_json(&parallel(vec!["SELECT 1", "SELECT 2", "SELECT 3"]));
assert_eq!(fut.node_type, "JOIN");
let cfg: serde_json::Value =
serde_json::from_str(fut.query.as_deref().expect("parallel config")).unwrap();
assert_eq!(cfg["extra_nodes"].as_array().map(|v| v.len()), Some(1));
}
}
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,21 @@ DECLARE
'df.seq(text, text)',
'df.as(text, text)',
'df.sleep(bigint)',
'df.sleep(double precision)',
'df.wait_for_schedule(text)',
'df.loop(text, text)',
'df.break(text)',
'df.if(text, text, text)',
'df.if_rows(text, text, text)',
'df.join(text, text)',
'df.join3(text, text, text)',
'df.parallel(text[])',
'df.join_all(text[])',
'df.race(text, text)',
'df.timeout(text, double precision)',
'df.retry(text, text, integer, bigint, bigint, double precision, double precision, text)',
'df.catch(text, text)',
'df.on_error(text, text)',
'df.wait_for_signal(text, integer)',
'df.signal(text, text, text)',
'df.start(text, text, text)',
Expand Down
Loading
Loading