From 3fe2b35ac5ac0a2f5e1d7b8b5c6b885244dfaa58 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 30 Mar 2026 11:05:16 -0700 Subject: [PATCH 01/10] feat: 53-bit unique id --- integration/pgdog.toml | 2 +- pgdog-config/src/general.rs | 10 ++ pgdog-config/src/sharding.rs | 35 +++++++ pgdog/src/main.rs | 4 + pgdog/src/unique_id.rs | 175 ++++++++++++++++++++++++++++++++++- 5 files changed, 220 insertions(+), 6 deletions(-) diff --git a/integration/pgdog.toml b/integration/pgdog.toml index 74ce6a3b4..e6ae18fef 100644 --- a/integration/pgdog.toml +++ b/integration/pgdog.toml @@ -24,7 +24,7 @@ query_parser_engine = "pg_query_raw" system_catalogs = "omnisharded_sticky" reload_schema_on_ddl = false #idle_healthcheck_delay = 50000000 - +unique_id_function = "standard" [memory] net_buffer = 8096 diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index b6e1b55ae..964f3009e 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -8,6 +8,7 @@ use std::str::FromStr; use std::time::Duration; use crate::pooling::ConnectionRecovery; +use crate::UniqueIdFunction; use crate::{ CopyFormat, CutoverTimeoutAction, LoadSchema, QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior, @@ -545,6 +546,10 @@ pub struct General { #[serde(default)] pub unique_id_min: u64, + /// Unique ID generation function. + #[serde(default)] + pub unique_id_function: UniqueIdFunction, + /// Changes how system catalog tables (like `pg_database`, `pg_class`, etc.) are treated by the query router. /// /// _Default:_ `omnisharded_sticky` @@ -718,6 +723,7 @@ impl Default for General { cutover_timeout: Self::cutover_timeout(), cutover_timeout_action: Self::cutover_timeout_action(), cutover_save_config: bool::default(), + unique_id_function: Self::unique_id_function(), } } } @@ -817,6 +823,10 @@ impl General { Self::env_or_default("PGDOG_BAN_REPLICA_LAG_BYTES", i64::MAX as u64) } + fn unique_id_function() -> UniqueIdFunction { + Self::env_enum_or_default("PGDOG_UNIQUE_ID_FUNCTION") + } + fn cutover_replication_lag_threshold() -> u64 { Self::env_or_default("PGDOG_CUTOVER_REPLICATION_LAG_THRESHOLD", 0) // 0 bytes diff --git a/pgdog-config/src/sharding.rs b/pgdog-config/src/sharding.rs index 7ee23b52d..26a3b1c40 100644 --- a/pgdog-config/src/sharding.rs +++ b/pgdog-config/src/sharding.rs @@ -516,6 +516,41 @@ impl FromStr for CutoverTimeoutAction { } } +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default, JsonSchema)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum UniqueIdFunction { + /// Standard 64-bit function using the entire 64-bit range. + #[default] + Standard, + /// Compact function using the leftest 53-bit range, making it + /// JavaScript-safe, so you can pass it as an integer directly + /// to the frontend apps. + /// + /// The year is 2026 and JavaScript continues to be a pain in the ass. + /// + Compact, +} + +impl FromStr for UniqueIdFunction { + type Err = (); + fn from_str(s: &str) -> Result { + match s { + "standard" => Ok(Self::Standard), + "compact" => Ok(Self::Compact), + _ => Err(()), + } + } +} + +impl Display for UniqueIdFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Compact => write!(f, "compact"), + Self::Standard => write!(f, "standard"), + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/pgdog/src/main.rs b/pgdog/src/main.rs index cf74815b2..e993fb7e9 100644 --- a/pgdog/src/main.rs +++ b/pgdog/src/main.rs @@ -106,6 +106,10 @@ fn main() -> Result<(), Box> { config.config.general.workers, config.config.memory.stack_size / 1024 / 1024 ); + info!( + "using \"{}\" unique 64-bit ID generator", + config.config.general.unique_id_function + ); runtime.block_on(async move { pgdog(args.command).await })?; diff --git a/pgdog/src/unique_id.rs b/pgdog/src/unique_id.rs index 47d11a71e..d9e6a1898 100644 --- a/pgdog/src/unique_id.rs +++ b/pgdog/src/unique_id.rs @@ -13,6 +13,7 @@ use std::time::{Duration, SystemTime}; use once_cell::sync::OnceCell; use parking_lot::Mutex; +use pgdog_config::UniqueIdFunction; use thiserror::Error; use crate::config::config; @@ -31,6 +32,14 @@ const TIMESTAMP_SHIFT: u8 = (SEQUENCE_BITS + NODE_BITS) as u8; // 22 const MAX_OFFSET: u64 = i64::MAX as u64 - ((MAX_TIMESTAMP << TIMESTAMP_SHIFT) | (MAX_NODE_ID << NODE_SHIFT) | MAX_SEQUENCE); +// Compact layout: 41 timestamp + 6 node + 6 sequence = 53 bits (JS-safe) +const COMPACT_NODE_BITS: u64 = 6; // Max 63 nodes +const COMPACT_SEQUENCE_BITS: u64 = 6; +const COMPACT_MAX_NODE_ID: u64 = (1 << COMPACT_NODE_BITS) - 1; // 63 +const COMPACT_MAX_SEQUENCE: u64 = (1 << COMPACT_SEQUENCE_BITS) - 1; // 63 +const COMPACT_NODE_SHIFT: u8 = COMPACT_SEQUENCE_BITS as u8; // 6 +const COMPACT_TIMESTAMP_SHIFT: u8 = (COMPACT_SEQUENCE_BITS + COMPACT_NODE_BITS) as u8; // 12 + static UNIQUE_ID: OnceCell = OnceCell::new(); #[derive(Debug, Default)] @@ -39,6 +48,12 @@ struct State { sequence: u64, } +#[derive(Debug, Default)] +struct CompactState { + last_timestamp_ms: u64, + sequence: u64, +} + impl State { // Generate next unique ID in a distributed sequence. // The `node_id` argument must be globally unique. @@ -72,6 +87,35 @@ impl State { } } +impl CompactState { + fn next_id(&mut self, node_id: u64, id_offset: u64) -> u64 { + let mut now = wait_until(self.last_timestamp_ms); + + if now == self.last_timestamp_ms { + self.sequence = (self.sequence + 1) & COMPACT_MAX_SEQUENCE; + if self.sequence == 0 { + now = wait_until(now + 1); + } + } else { + self.sequence = 0; + } + + self.last_timestamp_ms = now; + + let elapsed = self.last_timestamp_ms - PGDOG_EPOCH; + assert!( + elapsed <= MAX_TIMESTAMP, + "unique_id_compact timestamp overflow: {elapsed} > {MAX_TIMESTAMP}" + ); + let timestamp_part = (elapsed & MAX_TIMESTAMP) << COMPACT_TIMESTAMP_SHIFT; + let node_part = node_id << COMPACT_NODE_SHIFT; + let sequence_part = self.sequence; + + let base_id = timestamp_part | node_part | sequence_part; + base_id + id_offset + } +} + // Get current time in ms. fn now_ms() -> u64 { SystemTime::now() @@ -100,6 +144,9 @@ pub enum Error { #[error("node ID exceeding maximum (1023): {0}")] NodeIdTooLarge(u64), + #[error("node ID exceeding compact maximum (63): {0}")] + CompactNodeIdTooLarge(u64), + #[error("id_offset too large, would overflow i64: {0}")] OffsetTooLarge(u64), } @@ -108,12 +155,14 @@ pub enum Error { pub struct UniqueId { node_id: u64, id_offset: u64, - inner: Mutex, + standard: Mutex, + compact: Mutex, + function: UniqueIdFunction, } impl UniqueId { /// Initialize the UniqueId generator. - fn new() -> Result { + fn new(function: UniqueIdFunction) -> Result { let node_id = node_id().map_err(|_| Error::InvalidNodeId(instance_id().to_string()))?; let min_id = config().config.general.unique_id_min; @@ -122,6 +171,11 @@ impl UniqueId { return Err(Error::NodeIdTooLarge(node_id)); } + // Compact (JS-safe) IDs only have 6 node bits. + if node_id > COMPACT_MAX_NODE_ID { + return Err(Error::CompactNodeIdTooLarge(node_id)); + } + if min_id > MAX_OFFSET { return Err(Error::OffsetTooLarge(min_id)); } @@ -129,18 +183,30 @@ impl UniqueId { Ok(Self { node_id, id_offset: min_id, - inner: Mutex::new(State::default()), + standard: Mutex::new(State::default()), + compact: Mutex::new(CompactState::default()), + function, }) } /// Get (and initialize, if necessary) the unique ID generator. pub fn generator() -> Result<&'static UniqueId, Error> { - UNIQUE_ID.get_or_try_init(Self::new) + UNIQUE_ID.get_or_try_init(|| { + let config = config(); + Self::new(config.config.general.unique_id_function) + }) } /// Generate a globally unique, monotonically increasing identifier. pub fn next_id(&self) -> i64 { - self.inner.lock().next_id(self.node_id, self.id_offset) as i64 + match self.function { + UniqueIdFunction::Compact => { + self.compact.lock().next_id(self.node_id, self.id_offset) as i64 + } + UniqueIdFunction::Standard => { + self.standard.lock().next_id(self.node_id, self.id_offset) as i64 + } + } } } @@ -253,6 +319,105 @@ mod test { } } + #[test] + fn test_compact_unique_ids() { + let num_ids = 10_000; + let mut ids = HashSet::new(); + let mut state = CompactState::default(); + let node_id = 1u64; + + for _ in 0..num_ids { + ids.insert(state.next_id(node_id, 0)); + } + + assert_eq!(ids.len(), num_ids); + } + + #[test] + fn test_compact_monotonically_increasing() { + let mut state = CompactState::default(); + let node_id = 1u64; + + let mut prev_id = 0u64; + for _ in 0..10_000 { + let id = state.next_id(node_id, 0); + assert!(id > prev_id, "ID {id} not greater than previous {prev_id}"); + prev_id = id; + } + } + + #[test] + fn test_compact_js_safe() { + let mut state = CompactState::default(); + let node_id = COMPACT_MAX_NODE_ID; + const JS_MAX_SAFE_INTEGER: u64 = (1 << 53) - 1; + + for _ in 0..10_000 { + let id = state.next_id(node_id, 0); + let signed = id as i64; + assert!(signed > 0, "ID should be positive, got {signed}"); + assert!( + id <= JS_MAX_SAFE_INTEGER, + "ID {id} exceeds JS MAX_SAFE_INTEGER {JS_MAX_SAFE_INTEGER}" + ); + } + } + + #[test] + fn test_compact_bit_layout() { + // 41 timestamp + 6 node + 6 sequence = 53 bits + assert_eq!( + TIMESTAMP_BITS + COMPACT_NODE_BITS + COMPACT_SEQUENCE_BITS, + 53 + ); + assert_eq!(COMPACT_TIMESTAMP_SHIFT, 12); + assert_eq!(COMPACT_NODE_SHIFT, 6); + } + + #[test] + fn test_compact_max_values_fit() { + let max_elapsed = MAX_TIMESTAMP; + let max_node = COMPACT_MAX_NODE_ID; + let max_seq = COMPACT_MAX_SEQUENCE; + + let id = + (max_elapsed << COMPACT_TIMESTAMP_SHIFT) | (max_node << COMPACT_NODE_SHIFT) | max_seq; + let signed = id as i64; + const JS_MAX_SAFE_INTEGER: i64 = (1 << 53) - 1; + + assert!( + signed > 0, + "Max compact ID should be positive, got {signed}" + ); + assert!( + signed <= JS_MAX_SAFE_INTEGER, + "Max compact ID {signed} exceeds JS MAX_SAFE_INTEGER {JS_MAX_SAFE_INTEGER}" + ); + } + + #[test] + fn test_compact_extract_components() { + let node: u64 = 42; + let mut state = CompactState::default(); + + let id = state.next_id(node, 0); + + let extracted_seq = id & COMPACT_MAX_SEQUENCE; + let extracted_node = (id >> COMPACT_NODE_SHIFT) & COMPACT_MAX_NODE_ID; + let extracted_elapsed = id >> COMPACT_TIMESTAMP_SHIFT; + + assert_eq!(extracted_node, node); + assert_eq!(extracted_seq, 0); + assert!(extracted_elapsed > 0); + + let id2 = state.next_id(node, 0); + let extracted_seq2 = id2 & COMPACT_MAX_SEQUENCE; + let extracted_node2 = (id2 >> COMPACT_NODE_SHIFT) & COMPACT_MAX_NODE_ID; + + assert_eq!(extracted_node2, node); + assert!(matches!(extracted_seq2, 1 | 0)); + } + #[test] fn test_id_offset_monotonic() { let offset: u64 = 1_000_000_000; From d09792deb10b9d7bbccb291dc488951df5b9b09e Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 30 Mar 2026 11:08:11 -0700 Subject: [PATCH 02/10] json schema --- .schema/pgdog.schema.json | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 87166b219..247eab443 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -102,6 +102,7 @@ "tls_verify": "prefer", "two_phase_commit": false, "two_phase_commit_auto": null, + "unique_id_function": "standard", "unique_id_min": 0, "workers": 2 } @@ -1025,6 +1026,11 @@ ], "default": null }, + "unique_id_function": { + "description": "Unique ID generation function.", + "$ref": "#/$defs/UniqueIdFunction", + "default": "standard" + }, "unique_id_min": { "description": "Minimum ID for unique ID generator.", "type": "integer", @@ -1817,6 +1823,20 @@ } ] }, + "UniqueIdFunction": { + "oneOf": [ + { + "description": "Standard 64-bit function using the entire 64-bit range.", + "type": "string", + "const": "standard" + }, + { + "description": "Compact function using the leftest 53-bit range, making it\nJavaScript-safe, so you can pass it as an integer directly\nto the frontend apps.\n\nThe year is 2026 and JavaScript continues to be a pain in the ass.", + "type": "string", + "const": "compact" + } + ] + }, "Vector": { "type": "object", "properties": { From b0c1d5c1be69dcecae07cff3103259e6dd5803d1 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 31 Mar 2026 14:25:43 -0700 Subject: [PATCH 03/10] wip: passthrough id generation --- pgdog-config/src/rewrite.rs | 4 + pgdog/src/backend/schema/mod.rs | 270 ++++++++++++++++++ pgdog/src/backend/schema/setup.sql | 86 ++++++ .../parser/rewrite/statement/auto_id.rs | 128 +++++++-- 4 files changed, 471 insertions(+), 17 deletions(-) diff --git a/pgdog-config/src/rewrite.rs b/pgdog-config/src/rewrite.rs index 76a9ecec9..71cbff087 100644 --- a/pgdog-config/src/rewrite.rs +++ b/pgdog-config/src/rewrite.rs @@ -19,6 +19,8 @@ pub enum RewriteMode { Error, /// Automatically rewrite the query and execute it. Rewrite, + /// Rewrite only for omnisharded tables. + RewriteOmni, } impl fmt::Display for RewriteMode { @@ -27,6 +29,7 @@ impl fmt::Display for RewriteMode { RewriteMode::Error => "error", RewriteMode::Rewrite => "rewrite", RewriteMode::Ignore => "ignore", + RewriteMode::RewriteOmni => "rewrite_omni", }; f.write_str(value) } @@ -40,6 +43,7 @@ impl FromStr for RewriteMode { "error" => Ok(RewriteMode::Error), "rewrite" => Ok(RewriteMode::Rewrite), "ignore" => Ok(RewriteMode::Ignore), + "rewrite_omni" => Ok(RewriteMode::RewriteOmni), _ => Err(()), } } diff --git a/pgdog/src/backend/schema/mod.rs b/pgdog/src/backend/schema/mod.rs index 4c1e849b0..8d3fd6ad8 100644 --- a/pgdog/src/backend/schema/mod.rs +++ b/pgdog/src/backend/schema/mod.rs @@ -6,6 +6,13 @@ pub mod sync; pub use pgdog_stats::{ Relation as StatsRelation, Relations as StatsRelations, Schema as StatsSchema, SchemaInner, }; +use pg_query::{ + protobuf::{ + ColumnDef, ConstrType, Constraint, CreateStmt, OnCommitAction, RangeVar, + String as PgString, TypeName, + }, + Node, NodeEnum, +}; use serde::{Deserialize, Serialize}; use std::ops::DerefMut; use std::{collections::HashMap, ops::Deref}; @@ -223,6 +230,142 @@ impl Schema { pub fn search_path(&self) -> &[String] { &self.inner.search_path } + + /// Generate a `CREATE TABLE` statement for a relation in the schema. + pub fn to_sql(&self, schema: &str, table: &str) -> Option { + let relation = self.inner.get(schema, table)?; + + let mut table_elts = Vec::with_capacity(relation.columns.len()); + let mut pk_columns = Vec::new(); + + for column in relation.columns.values() { + let mut constraints = Vec::new(); + + if !column.is_nullable { + constraints.push(Node { + node: Some(NodeEnum::Constraint(Box::new(Constraint { + contype: ConstrType::ConstrNotnull.into(), + ..Default::default() + }))), + }); + } + + if !column.column_default.is_empty() { + if let Some(expr) = Self::parse_default_expr(&column.column_default) { + constraints.push(Node { + node: Some(NodeEnum::Constraint(Box::new(Constraint { + contype: ConstrType::ConstrDefault.into(), + raw_expr: Some(Box::new(expr)), + ..Default::default() + }))), + }); + } + } + + if column.is_primary_key { + pk_columns.push(Node { + node: Some(NodeEnum::String(PgString { + sval: column.column_name.clone(), + })), + }); + } + + table_elts.push(Node { + node: Some(NodeEnum::ColumnDef(Box::new(ColumnDef { + colname: column.column_name.clone(), + type_name: Some(Self::pg_type_name(&column.data_type)), + is_local: true, + constraints, + ..Default::default() + }))), + }); + } + + if !pk_columns.is_empty() { + table_elts.push(Node { + node: Some(NodeEnum::Constraint(Box::new(Constraint { + contype: ConstrType::ConstrPrimary.into(), + keys: pk_columns, + ..Default::default() + }))), + }); + } + + let create_stmt = CreateStmt { + relation: Some(RangeVar { + schemaname: schema.to_owned(), + relname: table.to_owned(), + inh: true, + relpersistence: "p".to_owned(), + ..Default::default() + }), + table_elts, + oncommit: OnCommitAction::OncommitNoop.into(), + ..Default::default() + }; + + NodeEnum::CreateStmt(create_stmt).deparse().ok() + } + + /// Parse a column default expression into an AST node. + fn parse_default_expr(default: &str) -> Option { + let parsed = pg_query::parse(&format!("SELECT {default}")).ok()?; + let stmt = parsed.protobuf.stmts.first()?; + let node = stmt.stmt.as_ref()?; + let NodeEnum::SelectStmt(ref select) = node.node.as_ref()? else { + return None; + }; + let target = select.target_list.first()?; + let NodeEnum::ResTarget(ref res) = target.node.as_ref()? else { + return None; + }; + res.val.as_ref().map(|v| (**v).clone()) + } + + /// Map an information_schema data type name to a pg_catalog [`TypeName`]. + fn pg_type_name(data_type: &str) -> TypeName { + // Types that deparse correctly with pg_catalog qualification. + let pg_catalog_name = match data_type { + "bigint" => Some("int8"), + "integer" => Some("int4"), + "smallint" => Some("int2"), + "boolean" => Some("bool"), + "character varying" => Some("varchar"), + "double precision" => Some("float8"), + "real" => Some("float4"), + "timestamp without time zone" => Some("timestamp"), + "timestamp with time zone" => Some("timestamptz"), + "character" => Some("bpchar"), + _ => None, + }; + + let names = if let Some(pg_name) = pg_catalog_name { + vec![ + Node { + node: Some(NodeEnum::String(PgString { + sval: "pg_catalog".to_owned(), + })), + }, + Node { + node: Some(NodeEnum::String(PgString { + sval: pg_name.to_owned(), + })), + }, + ] + } else { + vec![Node { + node: Some(NodeEnum::String(PgString { + sval: data_type.to_owned(), + })), + }] + }; + + TypeName { + names, + typemod: -1, + ..Default::default() + } + } } #[cfg(test)] @@ -274,6 +417,87 @@ mod test { assert!(debug.first().unwrap().contains("PgDog Debug")); } + #[tokio::test] + async fn test_install_next_id_seq() { + use crate::backend::server::test::test_server; + + let mut conn = test_server().await; + + // Use a dedicated schema to avoid conflicts with test_schema + // which drops the pgdog schema. + conn.execute_checked("CREATE SCHEMA IF NOT EXISTS pgdog_test") + .await + .unwrap(); + + // Install pgdog schema (CREATE OR REPLACE is idempotent). + Schema::setup(&mut conn).await.unwrap(); + + // Ensure shard config exists. + let count = conn + .fetch_all::("SELECT COUNT(*) FROM pgdog.config") + .await + .unwrap(); + if count.first().copied() == Some(0) { + conn.execute_checked( + "INSERT INTO pgdog.config (shard, shards) VALUES (0, 1)", + ) + .await + .unwrap(); + } + + // Clean up from previous runs and create a test table with BIGSERIAL primary key. + conn.execute_checked("DROP TABLE IF EXISTS pgdog_test.ids") + .await + .unwrap(); + conn.execute_checked( + "CREATE TABLE pgdog_test.ids (id BIGSERIAL PRIMARY KEY, value TEXT)", + ) + .await + .unwrap(); + + // Install the sharded sequence via install_next_id_seq. + let result = conn + .fetch_all::( + "SELECT pgdog.install_next_id_seq('pgdog_test', 'ids', 'id')", + ) + .await + .unwrap(); + assert!( + result.first().unwrap().contains("installed"), + "{}", + result.first().unwrap() + ); + + // Insert rows and collect generated IDs. + conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('a')") + .await + .unwrap(); + conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('b')") + .await + .unwrap(); + conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('c')") + .await + .unwrap(); + + let ids = conn + .fetch_all::("SELECT id FROM pgdog_test.ids ORDER BY id") + .await + .unwrap(); + + assert_eq!(ids.len(), 3); + + // All IDs should be unique. + let mut unique = ids.clone(); + unique.sort(); + unique.dedup(); + assert_eq!(unique.len(), 3, "IDs are not unique: {:?}", ids); + + // Clean up. + conn.execute_checked("DROP SCHEMA pgdog_test CASCADE") + .await + .unwrap(); + } + #[test] fn test_resolve_search_path_default() { let schema = Schema::from_parts(vec!["$user".into(), "public".into()], HashMap::new()); @@ -415,4 +639,50 @@ mod test { assert!(result.is_some()); assert_eq!(result.unwrap().schema(), "custom"); } + + #[test] + fn test_to_sql() { + use crate::backend::schema::columns::Column; + + fn col(name: &str, table: &str, data_type: &str, ordinal: i32, pk: bool, nullable: bool) -> Column { + pgdog_stats::Column { + table_catalog: String::new(), + table_schema: "public".into(), + table_name: table.into(), + column_name: name.into(), + column_default: String::new(), + is_nullable: nullable, + data_type: data_type.into(), + ordinal_position: ordinal, + is_primary_key: pk, + foreign_keys: vec![], + } + .into() + } + + let columns = IndexMap::from([ + ("id".to_owned(), col("id", "users", "bigint", 1, true, false)), + ("name".to_owned(), col("name", "users", "text", 2, false, false)), + ("email".to_owned(), col("email", "users", "character varying", 3, false, true)), + ]); + + let relations: HashMap<(String, String), Relation> = + HashMap::from([(("public".into(), "users".into()), Relation::test_table("public", "users", columns))]); + let schema = Schema::from_parts(vec!["public".into()], relations); + + let sql = schema.to_sql("public", "users").unwrap(); + assert!(sql.contains("CREATE TABLE"), "{sql}"); + assert!(sql.contains("public"), "{sql}"); + assert!(sql.contains("users"), "{sql}"); + assert!(sql.contains("id"), "{sql}"); + assert!(sql.contains("name"), "{sql}"); + assert!(sql.contains("email"), "{sql}"); + assert!(sql.contains("PRIMARY KEY"), "{sql}"); + } + + #[test] + fn test_to_sql_not_found() { + let schema = Schema::from_parts(vec!["public".into()], HashMap::new()); + assert!(schema.to_sql("public", "nonexistent").is_none()); + } } diff --git a/pgdog/src/backend/schema/setup.sql b/pgdog/src/backend/schema/setup.sql index 538365c9b..de3e81d56 100644 --- a/pgdog/src/backend/schema/setup.sql +++ b/pgdog/src/backend/schema/setup.sql @@ -235,6 +235,92 @@ BEGIN END; $body$ LANGUAGE plpgsql; +-- Install the sharded sequence on a table and column, +-- automatically determining the sequence from the column's default value. +CREATE OR REPLACE FUNCTION pgdog.install_next_id_seq( + schema_name TEXT, + table_name TEXT, + column_name TEXT, + lock_timeout TEXT DEFAULT '1s' +) RETURNS TEXT AS $body$ +DECLARE max_id BIGINT; +DECLARE current_id BIGINT; +DECLARE seq_name TEXT; +DECLARE col_default TEXT; +DECLARE shard INTEGER; +DECLARE shards INTEGER; +BEGIN + -- Check inputs + EXECUTE format('SELECT "%s" FROM "%s"."%s" LIMIT 1', column_name, schema_name, table_name); + + -- Get shard configuration. + SELECT + pgdog.config.shard, + pgdog.config.shards + INTO shard, shards + FROM pgdog.config; + + IF shards IS NULL OR shard IS NULL THEN + RAISE EXCEPTION 'pgdog.config not set'; + END IF; + + -- Extract the sequence name from the column's default value. + SELECT pg_get_expr(d.adbin, d.adrelid) + INTO col_default + FROM pg_attrdef d + JOIN pg_attribute a ON a.attrelid = d.adrelid AND a.attnum = d.adnum + WHERE a.attrelid = format('"%s"."%s"', schema_name, table_name)::regclass + AND a.attname = column_name; + + IF col_default IS NULL THEN + RAISE EXCEPTION 'column "%" on table "%"."%" has no default value', column_name, schema_name, table_name; + END IF; + + -- Extract sequence name from nextval('sequence_name'::regclass). + SELECT substring(col_default FROM 'nextval\(''([^'']+)''') + INTO seq_name; + + IF seq_name IS NULL THEN + RAISE EXCEPTION 'could not extract sequence name from default: %', col_default; + END IF; + + PERFORM pgdog.check_table(schema_name, table_name); + + IF NOT pgdog.check_column(schema_name, table_name, column_name) THEN + RAISE WARNING 'column is not indexed, this can be very slow'; + END IF; + + -- Lock table to prevent more writes. + EXECUTE format('LOCK TABLE "%s"."%s" IN ACCESS EXCLUSIVE MODE', schema_name, table_name); + + -- Get the max column value. + EXECUTE format('SELECT MAX("%s") FROM "%s"."%s"', column_name, schema_name, table_name) INTO max_id; + + -- Get current sequence value. + EXECUTE format('SELECT last_value FROM %s', seq_name) INTO current_id; + + -- Install the function as the source of IDs. + EXECUTE format( + 'ALTER TABLE "%s"."%s" ALTER COLUMN "%s" SET DEFAULT pgdog.next_id_seq(''%s''::regclass)', + schema_name, + table_name, + column_name, + seq_name + ); + + -- Update the sequence value if it's too low. + IF current_id < max_id THEN + PERFORM setval(seq_name::regclass, max_id); + END IF; + + RETURN format('pgdog.next_id_seq(''%s'') installed on table "%s"."%s"', + seq_name, + schema_name, + table_name + ); +END; +$body$ LANGUAGE plpgsql; + -- Install trigger protecting the sharded column from bad inserts/updates. CREATE OR REPLACE FUNCTION pgdog.install_trigger( schema_name text, diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs index 3d50b2e49..2397fcfeb 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs @@ -6,7 +6,7 @@ use pg_query::{Node, NodeEnum}; use pgdog_config::RewriteMode; use super::{Error, RewritePlan, StatementRewrite}; -use crate::frontend::router::parser::Table; +use crate::frontend::router::parser::{StatementParser, Table}; impl StatementRewrite<'_> { /// Handle BIGINT primary key columns in INSERT statements based on config. @@ -26,7 +26,7 @@ impl StatementRewrite<'_> { return Ok(()); } - let Some(table) = self.get_insert_table() else { + let Some((table, is_sharded)) = self.get_insert_table() else { return Ok(()); }; @@ -75,31 +75,35 @@ impl StatementRewrite<'_> { return Ok(()); } - match mode { - RewriteMode::Error => { - return Err(Error::MissingPrimaryKey); - } - RewriteMode::Rewrite => { - for column in missing_columns { - self.inject_column_with_unique_id(column)?; - plan.auto_id_injected += 1; - } - self.rewritten = true; + if mode == RewriteMode::Error { + return Err(Error::MissingPrimaryKey); + } + + let rewrite = + mode == RewriteMode::Rewrite || mode == RewriteMode::RewriteOmni && !is_sharded; + + if rewrite { + for column in missing_columns { + self.inject_column_with_unique_id(column)?; + plan.auto_id_injected += 1; } - RewriteMode::Ignore => unreachable!(), + self.rewritten = true; } Ok(()) } /// Get the table from an INSERT statement. - fn get_insert_table(&self) -> Option> { + fn get_insert_table(&self) -> Option<(Table<'_>, bool)> { let stmt = self.stmt.stmts.first()?; let node = stmt.stmt.as_ref()?; if let NodeEnum::InsertStmt(insert) = node.node.as_ref()? { let relation = insert.relation.as_ref()?; - return Some(Table::from(relation)); + let is_sharded = StatementParser::from_insert(insert, None, self.schema, None) + .is_sharded(self.db_schema, self.user, self.search_path); + + return Some((Table::from(relation), is_sharded)); } None @@ -243,13 +247,13 @@ fn is_bigint_type(data_type: &str) -> bool { #[cfg(test)] mod tests { use indexmap::IndexMap; - use pgdog_config::Rewrite; + use pgdog_config::{Rewrite, ShardedTable, SystemCatalogsBehavior}; use std::collections::HashMap; use super::*; use crate::backend::schema::columns::StatsColumn as SchemaColumn; use crate::backend::schema::{Relation, Schema}; - use crate::backend::ShardingSchema; + use crate::backend::{ShardedTables, ShardingSchema}; use crate::frontend::router::parser::StatementRewriteContext; use crate::frontend::PreparedStatements; @@ -532,4 +536,94 @@ mod tests { // DEFAULT should NOT be replaced in error mode assert!(sql.to_uppercase().contains("DEFAULT")); } + + fn sharding_schema_with_sharded_users(mode: RewriteMode) -> ShardingSchema { + ShardingSchema { + shards: 3, + tables: ShardedTables::new( + vec![ShardedTable { + column: "id".into(), + name: Some("users".into()), + ..Default::default() + }], + vec![], + false, + SystemCatalogsBehavior::default(), + ), + rewrite: Rewrite { + primary_key: mode, + ..Default::default() + }, + ..Default::default() + } + } + + fn rewrite_sql_with_sharding_schema( + sql: &str, + db_schema: &Schema, + schema: &ShardingSchema, + ) -> Result<(String, RewritePlan), Error> { + unsafe { + std::env::set_var("NODE_ID", "pgdog-1"); + } + let mut ast = pg_query::parse(sql).unwrap().protobuf; + let mut prepared = PreparedStatements::default(); + let mut rewriter = StatementRewrite::new(StatementRewriteContext { + stmt: &mut ast, + extended: false, + prepared: false, + prepared_statements: &mut prepared, + schema, + db_schema, + user: "", + search_path: None, + }); + let plan = rewriter.maybe_rewrite()?; + let result = if plan.stmt.is_some() { + plan.stmt.clone().unwrap() + } else { + ast.deparse().unwrap() + }; + Ok((result, plan)) + } + + #[test] + fn test_rewrite_omni_skips_sharded_table() { + let db_schema = make_schema_with_bigint_pk(); + let schema = sharding_schema_with_sharded_users(RewriteMode::RewriteOmni); + let (sql, plan) = rewrite_sql_with_sharding_schema( + "INSERT INTO users (name) VALUES ('test')", + &db_schema, + &schema, + ) + .unwrap(); + + // users is sharded, so RewriteOmni should NOT inject auto id + assert_eq!(plan.auto_id_injected, 0); + assert!(!sql.contains("::bigint")); + } + + #[test] + fn test_rewrite_omni_injects_for_non_sharded_table() { + let db_schema = make_schema_with_bigint_pk(); + // No sharded tables configured, so "users" is not sharded + let schema = ShardingSchema { + shards: 3, + rewrite: Rewrite { + primary_key: RewriteMode::RewriteOmni, + ..Default::default() + }, + ..Default::default() + }; + let (sql, plan) = rewrite_sql_with_sharding_schema( + "INSERT INTO users (name) VALUES ('test')", + &db_schema, + &schema, + ) + .unwrap(); + + // users is NOT sharded, so RewriteOmni should inject auto id + assert_eq!(plan.auto_id_injected, 1); + assert!(sql.contains("::bigint")); + } } From 73a9c4f3b88d1ce1241a1aaa1446a985a56273e1 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 12:04:23 -0700 Subject: [PATCH 04/10] ok this works --- integration/copy_data/init.sql | 2 +- integration/copy_data/pgdog.toml | 8 + integration/copy_data/setup.sql | 6 + integration/schema_sync/pgdog.toml | 4 + pgdog-config/src/rewrite.rs | 2 +- pgdog/src/admin/setup_schema.rs | 4 +- pgdog/src/backend/databases.rs | 17 +++ .../replication/logical/orchestrator.rs | 5 +- pgdog/src/backend/schema/mod.rs | 138 +++++++++-------- pgdog/src/backend/schema/setup.sql | 139 +++++++----------- pgdog/src/backend/schema/sync/config.rs | 4 +- pgdog/src/backend/server.rs | 7 + 12 files changed, 178 insertions(+), 158 deletions(-) diff --git a/integration/copy_data/init.sql b/integration/copy_data/init.sql index 9fdd1fb33..5e8dfb34f 100644 --- a/integration/copy_data/init.sql +++ b/integration/copy_data/init.sql @@ -3,6 +3,6 @@ DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog2 DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog --- DROP SCHEMA IF EXISTS copy_data CASCADE; +DROP SCHEMA IF EXISTS copy_data CASCADE; SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; \i setup.sql diff --git a/integration/copy_data/pgdog.toml b/integration/copy_data/pgdog.toml index 36e856411..086a7bbca 100644 --- a/integration/copy_data/pgdog.toml +++ b/integration/copy_data/pgdog.toml @@ -23,5 +23,13 @@ database = "destination" column = "tenant_id" data_type = "bigint" +[[sharded_tables]] +database = "source" +column = "tenant_id" +data_type = "bigint" + [admin] password = "pgdog" + +[rewrite] +primary_key = "rewrite_omni" diff --git a/integration/copy_data/setup.sql b/integration/copy_data/setup.sql index fa6114e5e..be6e6fe32 100644 --- a/integration/copy_data/setup.sql +++ b/integration/copy_data/setup.sql @@ -45,6 +45,12 @@ CREATE TABLE copy_data.with_identity( tenant_id BIGINT NOT NULL ); +CREATE TABLE copy_data.settings ( + id BIGSERIAL PRIMARY KEY, + setting_name TEXT NOT NULL UNIQUE, + setting_value TEXT NOT NULL +); + DROP PUBLICATION IF EXISTS pgdog; CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data; diff --git a/integration/schema_sync/pgdog.toml b/integration/schema_sync/pgdog.toml index 3b9cfaede..8e72f31de 100644 --- a/integration/schema_sync/pgdog.toml +++ b/integration/schema_sync/pgdog.toml @@ -14,3 +14,7 @@ database_name = "pgdog1" name = "destination" host = "127.0.0.1" database_name = "pgdog2" + +[[sharded_tables]] +database = "destination" +column = "user_id" diff --git a/pgdog-config/src/rewrite.rs b/pgdog-config/src/rewrite.rs index 71cbff087..d55c82c1d 100644 --- a/pgdog-config/src/rewrite.rs +++ b/pgdog-config/src/rewrite.rs @@ -9,7 +9,7 @@ use std::str::FromStr; #[derive( Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, JsonSchema, )] -#[serde(rename_all = "lowercase")] +#[serde(rename_all = "snake_case")] #[derive(Default)] pub enum RewriteMode { /// Forward the query unchanged. diff --git a/pgdog/src/admin/setup_schema.rs b/pgdog/src/admin/setup_schema.rs index b80cc4a8b..f921c7f00 100644 --- a/pgdog/src/admin/setup_schema.rs +++ b/pgdog/src/admin/setup_schema.rs @@ -17,8 +17,8 @@ impl Command for SetupSchema { async fn execute(&self) -> Result, Error> { let databases = databases(); - for cluster in databases.all().values() { - Schema::install(cluster) + for cluster in databases.schema_owners() { + Schema::install(&cluster) .await .map_err(|e| Error::Backend(Box::new(e)))?; } diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 682a9aa8a..a23f83db4 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -348,6 +348,23 @@ impl Databases { Err(Error::NoSchemaOwner(database.to_owned())) } + /// Get all schema owners for all databases, + /// one per database. + /// + /// N.B.: Subsequent entry will override previous entry. + /// + pub fn schema_owners(&self) -> Vec { + let mut schema_owners = HashMap::new(); + + for cluster in self.databases.values() { + if cluster.schema_admin() { + schema_owners.insert(cluster.name().to_string(), cluster.clone()); + } + } + + schema_owners.into_values().collect() + } + pub fn mirrors(&self, user: impl ToUser) -> Result, Error> { let user = user.to_user(); if self.databases.contains_key(&user) { diff --git a/pgdog/src/backend/replication/logical/orchestrator.rs b/pgdog/src/backend/replication/logical/orchestrator.rs index 656ac3b8b..2d9eec451 100644 --- a/pgdog/src/backend/replication/logical/orchestrator.rs +++ b/pgdog/src/backend/replication/logical/orchestrator.rs @@ -3,7 +3,7 @@ use crate::{ databases::{cancel_all, cutover}, maintenance_mode, schema::sync::{pg_dump::PgDumpOutput, PgDump}, - Cluster, + Cluster, Schema, }, util::{format_bytes, human_duration, random_string}, }; @@ -109,6 +109,9 @@ impl Orchestrator { self.refresh_publisher(); + // Install our stuff. + Schema::install(&self.destination).await?; + Ok(()) } diff --git a/pgdog/src/backend/schema/mod.rs b/pgdog/src/backend/schema/mod.rs index 8d3fd6ad8..9b3479f1c 100644 --- a/pgdog/src/backend/schema/mod.rs +++ b/pgdog/src/backend/schema/mod.rs @@ -3,9 +3,6 @@ pub mod columns; pub mod relation; pub mod sync; -pub use pgdog_stats::{ - Relation as StatsRelation, Relations as StatsRelations, Schema as StatsSchema, SchemaInner, -}; use pg_query::{ protobuf::{ ColumnDef, ConstrType, Constraint, CreateStmt, OnCommitAction, RangeVar, @@ -13,16 +10,20 @@ use pg_query::{ }, Node, NodeEnum, }; +pub use pgdog_stats::{ + Relation as StatsRelation, Relations as StatsRelations, Schema as StatsSchema, SchemaInner, +}; use serde::{Deserialize, Serialize}; use std::ops::DerefMut; use std::{collections::HashMap, ops::Deref}; -use tracing::debug; +use tracing::info; pub use relation::Relation; use super::{pool::Request, Cluster, Error, Server}; use crate::frontend::router::parser::Table; use crate::net::parameter::ParameterValue; +use sync::ShardConfig; static SETUP: &str = include_str!("setup.sql"); @@ -112,57 +113,52 @@ impl Schema { let shards = cluster.shards(); let sharded_tables = cluster.sharded_tables(); - if shards.len() < 2 || sharded_tables.is_empty() { + if sharded_tables.is_empty() { return Ok(()); } - for (shard_number, shard) in shards.iter().enumerate() { + // Sync configuration. + ShardConfig::sync_all(cluster).await?; + + for shard in shards { let mut server = shard.primary(&Request::default()).await?; Self::setup(&mut server).await?; let schema = Self::load(&mut server).await?; - debug!("[{}] {:#?}", server.addr(), schema); - - for table in sharded_tables { - for schema_table in schema - .tables() - .iter() - .filter(|table| table.schema() != "pgdog") - { - let column_match = schema_table.columns().values().find(|column| { - column.column_name == table.column && column.data_type == "bigint" - }); - if let Some(column_match) = column_match { - if table.name.is_none() - || table.name == Some(column_match.table_name.clone()) - { - if table.primary { - let query = format!( - "SELECT pgdog.install_next_id('{}', '{}', '{}', {}, {})", - schema_table.schema(), - schema_table.name, - column_match.column_name, - shards.len(), - shard_number - ); - - server.execute(&query).await?; - } - - let query = format!( - "SELECT pgdog.install_trigger('{}', '{}', '{}', {}, {})", - schema_table.schema(), - schema_table.name, - column_match.column_name, - shards.len(), - shard_number - ); - - server.execute(&query).await?; - } - } + let tables = schema + .tables() + .iter() + .cloned() + .filter(|table| !matches!(table.schema.as_str(), "pgdog" | "pgdog_shadow")) + .collect::>(); + + info!( + "[schema] checking {} tables for sharded sequences [{}]", + tables.len(), + server.addr(), + ); + + for table in tables { + for column in table.columns().iter().filter(|column| { + column.1.is_primary_key // Only primary keys. + && matches!(column.1.data_type.as_str(), "bigint" | "int8") // Only BIGINT. + && column.1.column_default.contains("nextval") // Only the ones that rely on a sequence. + }) { + info!( + "[schema] creating sharded sequence for \"{}\".\"{}\".\"{}\"", + column.1.table_schema, column.1.table_name, column.1.column_name, + ); + + let query = format!( + "SELECT pgdog.install_shadow_table('{}', '{}', '{}')", + column.1.table_schema, column.1.table_name, column.1.column_name, + ); + + server.execute_checked(&query).await?; } } + + info!("[schema] sharded sequence check done [{}]", server.addr()); } Ok(()) @@ -438,28 +434,22 @@ mod test { .await .unwrap(); if count.first().copied() == Some(0) { - conn.execute_checked( - "INSERT INTO pgdog.config (shard, shards) VALUES (0, 1)", - ) - .await - .unwrap(); + conn.execute_checked("INSERT INTO pgdog.config (shard, shards) VALUES (0, 1)") + .await + .unwrap(); } // Clean up from previous runs and create a test table with BIGSERIAL primary key. conn.execute_checked("DROP TABLE IF EXISTS pgdog_test.ids") .await .unwrap(); - conn.execute_checked( - "CREATE TABLE pgdog_test.ids (id BIGSERIAL PRIMARY KEY, value TEXT)", - ) - .await - .unwrap(); + conn.execute_checked("CREATE TABLE pgdog_test.ids (id BIGSERIAL PRIMARY KEY, value TEXT)") + .await + .unwrap(); // Install the sharded sequence via install_next_id_seq. let result = conn - .fetch_all::( - "SELECT pgdog.install_next_id_seq('pgdog_test', 'ids', 'id')", - ) + .fetch_all::("SELECT pgdog.install_next_id_seq('pgdog_test', 'ids', 'id')") .await .unwrap(); assert!( @@ -644,7 +634,14 @@ mod test { fn test_to_sql() { use crate::backend::schema::columns::Column; - fn col(name: &str, table: &str, data_type: &str, ordinal: i32, pk: bool, nullable: bool) -> Column { + fn col( + name: &str, + table: &str, + data_type: &str, + ordinal: i32, + pk: bool, + nullable: bool, + ) -> Column { pgdog_stats::Column { table_catalog: String::new(), table_schema: "public".into(), @@ -661,13 +658,24 @@ mod test { } let columns = IndexMap::from([ - ("id".to_owned(), col("id", "users", "bigint", 1, true, false)), - ("name".to_owned(), col("name", "users", "text", 2, false, false)), - ("email".to_owned(), col("email", "users", "character varying", 3, false, true)), + ( + "id".to_owned(), + col("id", "users", "bigint", 1, true, false), + ), + ( + "name".to_owned(), + col("name", "users", "text", 2, false, false), + ), + ( + "email".to_owned(), + col("email", "users", "character varying", 3, false, true), + ), ]); - let relations: HashMap<(String, String), Relation> = - HashMap::from([(("public".into(), "users".into()), Relation::test_table("public", "users", columns))]); + let relations: HashMap<(String, String), Relation> = HashMap::from([( + ("public".into(), "users".into()), + Relation::test_table("public", "users", columns), + )]); let schema = Schema::from_parts(vec!["public".into()], relations); let sql = schema.to_sql("public", "users").unwrap(); diff --git a/pgdog/src/backend/schema/setup.sql b/pgdog/src/backend/schema/setup.sql index de3e81d56..ed767f885 100644 --- a/pgdog/src/backend/schema/setup.sql +++ b/pgdog/src/backend/schema/setup.sql @@ -1,7 +1,9 @@ -- Schema where we are placing all of our code. CREATE SCHEMA IF NOT EXISTS pgdog; +CREATE SCHEMA IF NOT EXISTS pgdog_shadow; GRANT USAGE ON SCHEMA pgdog TO PUBLIC; +GRANT USAGE ON SCHEMA pgdog_shadow TO PUBLIC; -- Settings table. CREATE TABLE IF NOT EXISTS pgdog.config ( @@ -50,7 +52,10 @@ GRANT USAGE ON SEQUENCE pgdog.validator_bigint_id_seq TO PUBLIC; -- Generate a primary key from a sequence that will -- match the shard number this is ran on. -CREATE OR REPLACE FUNCTION pgdog.next_id_seq(sequence_name regclass) RETURNS BIGINT AS $body$ +CREATE OR REPLACE FUNCTION pgdog.next_id_seq( + sequence_name regclass, + table_name regclass default 'pgdog.validator_bigint'::regclass +) RETURNS BIGINT AS $body$ DECLARE next_value BIGINT; DECLARE seq_oid oid; DECLARE table_oid oid; @@ -58,7 +63,7 @@ DECLARE shards INTEGER; DECLARE shard INTEGER; BEGIN SELECT sequence_name INTO seq_oid; - SELECT 'pgdog.validator_bigint'::regclass INTO table_oid; + SELECT table_name INTO table_oid; SELECT pgdog.config.shard, pgdog.config.shards @@ -235,6 +240,52 @@ BEGIN END; $body$ LANGUAGE plpgsql; +-- +-- Create "shadow" table used for primary key generation using the internal sequence. +-- +-- This will create the table and the sequence. +-- +CREATE OR REPLACE FUNCTION pgdog.install_shadow_table( + schema_name TEXT, + table_name TEXT, + column_name TEXT, + lock_timeout TEXT DEFAULT '1s' +) RETURNS text AS $body$ +DECLARE shadow_table_name TEXT; +DECLARE shadow_seq_name TEXT; +BEGIN + SELECT schema_name || '_' || table_name INTO shadow_table_name; + SELECT schema_name || '_' || table_name || '_' || column_name || '_seq' INTO shadow_seq_name; + + PERFORM format('SET LOCAL lock_timeout TO ''%s''', lock_timeout); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS pgdog_shadow."%s" (LIKE "%s"."%s") PARTITION BY HASH("%s")', + shadow_table_name, + schema_name, + table_name, + column_name + ); + + -- Create sequence. + EXECUTE format('CREATE SEQUENCE IF NOT EXISTS pgdog_shadow."%s"', shadow_seq_name); + + -- Make the sequence owned by the shadow table. + EXECUTE format('ALTER SEQUENCE pgdog_shadow."%s" OWNED BY pgdog_shadow.%s.%s', shadow_seq_name, shadow_table_name, column_name); + + -- Set it as the default for the target table, allowing automatic ID generation. + EXECUTE format('ALTER TABLE "%s"."%s" ALTER COLUMN "%s" SET DEFAULT pgdog.next_id_seq(''pgdog_shadow.%s''::regclass, ''pgdog_shadow.%s'')', + schema_name, + table_name, + column_name, + shadow_seq_name, + shadow_table_name + ); + + RETURN format('"pgdog_shadow"."%s"', shadow_table_name); +END; +$body$ LANGUAGE plpgsql; + -- Install the sharded sequence on a table and column, -- automatically determining the sequence from the column's default value. CREATE OR REPLACE FUNCTION pgdog.install_next_id_seq( @@ -403,87 +454,3 @@ BEGIN RETURN format('installed on shard %s', shard); END; $body$ LANGUAGE plpgsql; - --- Globally unique 64-bit ID generator (Snowflake-like). --- Bit allocation: 41 timestamp + 10 node + 12 sequence = 63 bits (keeps sign bit clear) --- The sequence stores (elapsed_ms << 12) | sequence_within_ms, allowing --- automatic reset of the sequence counter when the millisecond changes. --- CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq; - --- CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$ --- DECLARE --- sequence_bits CONSTANT INTEGER := 12; --- node_bits CONSTANT INTEGER := 10; --- max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023 --- max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095 --- max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1; --- pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00 --- node_shift CONSTANT INTEGER := sequence_bits; -- 12 --- timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22 - --- node_id INTEGER; --- now_ms BIGINT; --- elapsed BIGINT; --- min_combined BIGINT; --- combined_seq BIGINT; --- seq INTEGER; --- timestamp_part BIGINT; --- node_part BIGINT; --- base_id BIGINT; --- BEGIN --- -- Get node_id from pgdog.config.shard --- SELECT pgdog.config.shard INTO node_id FROM pgdog.config; - --- IF node_id IS NULL THEN --- RAISE EXCEPTION 'pgdog.config.shard not set'; --- END IF; - --- IF node_id < 0 OR node_id > max_node_id THEN --- RAISE EXCEPTION 'shard must be between 0 and %', max_node_id; --- END IF; - --- LOOP --- -- Get next combined sequence value --- combined_seq := nextval('pgdog.unique_id_seq'); - --- -- Get current time in milliseconds since Unix epoch --- now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; --- elapsed := now_ms - pgdog_epoch; - --- IF elapsed < 0 THEN --- RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)'; --- END IF; - --- -- Minimum valid combined value for current millisecond --- min_combined := elapsed << 12; - --- -- If sequence is at or ahead of current time, we're good --- IF combined_seq >= min_combined THEN --- EXIT; --- END IF; - --- -- Sequence is behind current time, advance it --- PERFORM setval('pgdog.unique_id_seq', min_combined, false); --- END LOOP; - --- -- Decompose the combined sequence value --- seq := (combined_seq & max_sequence)::integer; --- elapsed := combined_seq >> 12; - --- IF elapsed > max_timestamp THEN --- RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp; --- END IF; - --- -- Compose the ID: timestamp | node | sequence --- timestamp_part := elapsed << timestamp_shift; --- node_part := node_id::bigint << node_shift; --- base_id := timestamp_part | node_part | seq; - --- RETURN base_id + id_offset; --- END; --- $body$ LANGUAGE plpgsql; - --- GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC; - --- -- Allow functions to be executed by anyone. --- GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC; diff --git a/pgdog/src/backend/schema/sync/config.rs b/pgdog/src/backend/schema/sync/config.rs index 50d09f796..ca8eea85b 100644 --- a/pgdog/src/backend/schema/sync/config.rs +++ b/pgdog/src/backend/schema/sync/config.rs @@ -37,7 +37,7 @@ impl ShardConfig { pub async fn sync_all(cluster: &Cluster) -> Result<(), Error> { let shards = cluster.shards().len(); - info!("setting up schema on {} shards", shards); + info!("setting up shard config on {} shards", shards); let shards: Vec<_> = cluster .shards() @@ -56,7 +56,7 @@ impl ShardConfig { shard.sync().await?; } - info!("schema setup complete for {} shards", shards.len()); + info!("shard config complete for {} shards", shards.len()); Ok(()) } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index ee5e03d01..d1a0a39af 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -730,6 +730,13 @@ impl Server { query: impl Into, ) -> Result, Error> { let messages = self.execute(query).await?; + let notices = messages.iter().filter(|m| m.code() == 'N'); + + for notice in notices { + let notice = NoticeResponse::from_bytes(notice.to_bytes()?)?; + warn!("{} [{}]", notice.message.message, self.addr()); + } + let error = messages.iter().find(|m| m.code() == 'E'); if let Some(error) = error { let error = ErrorResponse::from_bytes(error.to_bytes()?)?; From b6372dc96674950d00638c74117bbc0b43daf1d7 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 13:15:05 -0700 Subject: [PATCH 05/10] ok --- integration/rust/src/setup.rs | 12 +- integration/rust/tests/integration/mod.rs | 1 + .../rust/tests/integration/rewrite_omni.rs | 131 ++++++++++++++++++ integration/schema_sync/dev.sh | 25 +++- pgdog/src/admin/mod.rs | 2 + pgdog/src/admin/parser.rs | 4 + pgdog/src/admin/reset_prepared.rs | 26 ++++ .../parser/rewrite/statement/auto_id.rs | 18 +-- 8 files changed, 203 insertions(+), 16 deletions(-) create mode 100644 integration/rust/tests/integration/rewrite_omni.rs create mode 100644 pgdog/src/admin/reset_prepared.rs diff --git a/integration/rust/src/setup.rs b/integration/rust/src/setup.rs index 7268f8f0b..34d96de22 100644 --- a/integration/rust/src/setup.rs +++ b/integration/rust/src/setup.rs @@ -45,9 +45,19 @@ pub async fn connections_sqlx() -> Vec> { } pub async fn connection_sqlx_direct() -> Pool { + connection_sqlx_direct_db("pgdog").await +} + +pub async fn connection_sqlx_direct_db(name: &str) -> Pool { PgPoolOptions::new() .max_connections(1) - .connect("postgres://pgdog:pgdog@127.0.0.1:5432/pgdog?application_name=sqlx_direct") + .connect( + format!( + "postgres://pgdog:pgdog@127.0.0.1:5432/{}?application_name=sqlx_direct", + name + ) + .as_str(), + ) .await .unwrap() } diff --git a/integration/rust/tests/integration/mod.rs b/integration/rust/tests/integration/mod.rs index cd9baa510..69781b02c 100644 --- a/integration/rust/tests/integration/mod.rs +++ b/integration/rust/tests/integration/mod.rs @@ -22,6 +22,7 @@ pub mod prepared; pub mod reload; pub mod reset; pub mod rewrite; +pub mod rewrite_omni; pub mod savepoint; pub mod set_in_transaction; pub mod set_sharding_key; diff --git a/integration/rust/tests/integration/rewrite_omni.rs b/integration/rust/tests/integration/rewrite_omni.rs new file mode 100644 index 000000000..111dc8ad9 --- /dev/null +++ b/integration/rust/tests/integration/rewrite_omni.rs @@ -0,0 +1,131 @@ +use rust::setup::*; +use sqlx::Executor; + +#[tokio::test] +async fn test_omni_only_pk_rewrite() { + let shard_0 = connection_sqlx_direct_db("shard_0").await; + let shard_1 = connection_sqlx_direct_db("shard_1").await; + + for (shard, pool) in [&shard_0, &shard_1].iter().enumerate() { + pool.execute( + "DROP TABLE IF EXISTS public.test_omni_rewrite_pk_omni, test_omni_rewrite_pk_sharded CASCADE", + ) + .await + .unwrap(); + + pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_omni(id BIGSERIAL PRIMARY KEY, value TEXT NOT NULL)").await.unwrap(); + pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_sharded(id BIGSERIAL PRIMARY KEY, customer_id BIGINT NOT NULL, value TEXT NOT NULL)").await.unwrap(); + + pool.execute( + "SELECT pgdog.install_shadow_table('public', 'test_omni_rewrite_pk_sharded', 'id')", + ) + .await + .unwrap(); + + // Configure sharding. + let mut t = pool.begin().await.unwrap(); + t.execute("DELETE FROM pgdog.config").await.unwrap(); + t.execute( + format!( + "INSERT INTO pgdog.config (shard, shards) VALUES ({}, 2)", + shard + ) + .as_str(), + ) + .await + .unwrap(); + t.commit().await.unwrap(); + } + + let sharded = connections_sqlx().await.pop().unwrap(); + let admin = admin_sqlx().await; + + // This will reload the schema as well. + admin + .execute("SET rewrite_primary_key TO 'rewrite_omni'") + .await + .unwrap(); + + let starting_id: (i64,) = sqlx::query_as("SELECT pgdog.unique_id()") + .fetch_one(&sharded) + .await + .unwrap(); + + for run in 0..25 { + let omni_id: (i64,) = sqlx::query_as( + "INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id", + ) + .bind(format!("test_{}", run)) + .fetch_one(&sharded) + .await + .unwrap(); + + assert!( + omni_id.0 > starting_id.0, + "omni ID should be unique_id, but got {}", + omni_id.0 + ); + + let sharded_id: (i64,) = sqlx::query_as( + "INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id", + ) + .bind(run as i64) + .bind(format!("test_{}", run)) + .fetch_one(&sharded) + .await + .unwrap(); + + assert!( + sharded_id.0 < omni_id.0, + "sharded ID should not be unique_id, but got {}", + sharded_id.0 + ); + } + + sharded.close().await; + let sharded = connections_sqlx().await.pop().unwrap(); + + // Re-enable sharded unique ID. + admin + .execute("SET rewrite_primary_key TO 'rewrite'") + .await + .unwrap(); + + // The rewrite is cached in prepared statements + // and the query cache, so we need to be careful to rest it + // _after_ the client disconnected. In production, changing this setting + // definitely requires a restart. + admin.execute("RESET QUERY_CACHE").await.unwrap(); + admin.execute("RESET PREPARED").await.unwrap(); + + for run in 25..50 { + let omni_id: (i64,) = sqlx::query_as( + "INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id", + ) + .bind(format!("test_{}", run)) + .fetch_one(&sharded) + .await + .unwrap(); + + assert!( + omni_id.0 > starting_id.0, + "omni ID should be unique_id, but got {}", + omni_id.0 + ); + + let sharded_id: (i64,) = sqlx::query_as( + "INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id", + ) + .bind(run as i64) + .bind(format!("test_{}", run)) + .fetch_one(&sharded) + .await + .unwrap(); + + assert!( + sharded_id.0 > omni_id.0, + "sharded ID should be unique_id, but got {}", + sharded_id.0 + ); + } +} diff --git a/integration/schema_sync/dev.sh b/integration/schema_sync/dev.sh index e0f8d037a..e147c50d2 100644 --- a/integration/schema_sync/dev.sh +++ b/integration/schema_sync/dev.sh @@ -81,12 +81,25 @@ EOF diff source.sql destination.sql > diff.txt || true -# Extract column name and type from diff lines, ignoring everything else -# This normalizes across different PG versions and constraint syntaxes -ACTUAL_CONVERSIONS=$(grep '^[<>]' diff.txt | \ - grep -E '\b(integer|bigint)\b' | \ - sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+(integer|bigint).*/\1 \2/' | \ - sort -u) +# Extract integer -> bigint conversions from the diff +# 1. Get removed (< integer) and added (> bigint) column lines +# 2. Only keep columns that appear as integer in source AND bigint in destination +REMOVED_INT=$(grep '^<' diff.txt | \ + sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+integer\b.*/\1/' | \ + grep -E '^[a-z_]+$' | sort -u) + +ADDED_BIGINT=$(grep '^>' diff.txt | \ + sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+bigint\b.*/\1/' | \ + grep -E '^[a-z_]+$' | sort -u) + +# Columns that changed from integer to bigint +CONVERTED=$(comm -12 <(echo "$REMOVED_INT") <(echo "$ADDED_BIGINT")) + +# Build the expected format: column_name integer \n column_name bigint +ACTUAL_CONVERSIONS=$(echo "$CONVERTED" | while read col; do + echo "$col integer" + echo "$col bigint" +done | sort -u) EXPECTED_SORTED=$(echo "$EXPECTED_CONVERSIONS" | sort -u) diff --git a/pgdog/src/admin/mod.rs b/pgdog/src/admin/mod.rs index c4a2ed2dc..c1fa866a2 100644 --- a/pgdog/src/admin/mod.rs +++ b/pgdog/src/admin/mod.rs @@ -18,6 +18,7 @@ pub mod probe; pub mod reconnect; pub mod reload; pub mod replicate; +pub mod reset_prepared; pub mod reset_query_cache; pub mod reshard; pub mod schema_sync; @@ -60,6 +61,7 @@ pub use probe::*; pub use reconnect::*; pub use reload::*; pub use replicate::*; +pub use reset_prepared::*; pub use reset_query_cache::*; pub use reshard::*; pub use schema_sync::*; diff --git a/pgdog/src/admin/parser.rs b/pgdog/src/admin/parser.rs index c808682c5..a26779e7a 100644 --- a/pgdog/src/admin/parser.rs +++ b/pgdog/src/admin/parser.rs @@ -15,6 +15,7 @@ pub enum ParseResult { ShowServers(ShowServers), ShowPeers(ShowPeers), ShowQueryCache(ShowQueryCache), + ResetPrepared(ResetPrepared), ResetQueryCache(ResetQueryCache), ShowStats(ShowStats), ShowTransactions(ShowTransactions), @@ -60,6 +61,7 @@ impl ParseResult { ShowServers(show_servers) => show_servers.execute().await, ShowPeers(show_peers) => show_peers.execute().await, ShowQueryCache(show_query_cache) => show_query_cache.execute().await, + ResetPrepared(cmd) => cmd.execute().await, ResetQueryCache(reset_query_cache) => reset_query_cache.execute().await, ShowStats(show_stats) => show_stats.execute().await, ShowTransactions(show_transactions) => show_transactions.execute().await, @@ -105,6 +107,7 @@ impl ParseResult { ShowServers(show_servers) => show_servers.name(), ShowPeers(show_peers) => show_peers.name(), ShowQueryCache(show_query_cache) => show_query_cache.name(), + ResetPrepared(cmd) => cmd.name(), ResetQueryCache(reset_query_cache) => reset_query_cache.name(), ShowStats(show_stats) => show_stats.name(), ShowTransactions(show_transactions) => show_transactions.name(), @@ -194,6 +197,7 @@ impl Parser { } }, "reset" => match iter.next().ok_or(Error::Syntax)?.trim() { + "prepared" => ParseResult::ResetPrepared(ResetPrepared::parse(&sql)?), "query_cache" => ParseResult::ResetQueryCache(ResetQueryCache::parse(&sql)?), command => { debug!("unknown admin show command: '{}'", command); diff --git a/pgdog/src/admin/reset_prepared.rs b/pgdog/src/admin/reset_prepared.rs new file mode 100644 index 000000000..a04707626 --- /dev/null +++ b/pgdog/src/admin/reset_prepared.rs @@ -0,0 +1,26 @@ +//! RESET PREPARED. +use crate::config::config; +use crate::frontend::prepared_statements::PreparedStatements; + +use super::prelude::*; + +pub struct ResetPrepared; + +#[async_trait] +impl Command for ResetPrepared { + fn name(&self) -> String { + "RESET PREPARED".into() + } + + fn parse(_: &str) -> Result { + Ok(Self) + } + + async fn execute(&self) -> Result, Error> { + let config = config(); + PreparedStatements::global() + .write() + .close_unused(config.config.general.prepared_statements_limit); + Ok(vec![]) + } +} diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs index 2397fcfeb..4f4eb0702 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs @@ -62,15 +62,6 @@ impl StatementRewrite<'_> { .copied() .collect(); - // Replace DEFAULT values with unique_id() for present columns (only in rewrite mode) - if mode == RewriteMode::Rewrite { - let replaced = self.replace_set_to_default_at_positions(&present_pk_positions); - if replaced > 0 { - plan.auto_id_injected += replaced as u16; - self.rewritten = true; - } - } - if missing_columns.is_empty() { return Ok(()); } @@ -82,6 +73,15 @@ impl StatementRewrite<'_> { let rewrite = mode == RewriteMode::Rewrite || mode == RewriteMode::RewriteOmni && !is_sharded; + // Replace DEFAULT values with unique_id() for present columns (only in rewrite mode) + if rewrite { + let replaced = self.replace_set_to_default_at_positions(&present_pk_positions); + if replaced > 0 { + plan.auto_id_injected += replaced as u16; + self.rewritten = true; + } + } + if rewrite { for column in missing_columns { self.inject_column_with_unique_id(column)?; From d5150a56dfdf267de07124621e5b422a08ef179c Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 13:19:28 -0700 Subject: [PATCH 06/10] fix checks --- pgdog/src/unique_id.rs | 45 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/pgdog/src/unique_id.rs b/pgdog/src/unique_id.rs index d9e6a1898..51194f249 100644 --- a/pgdog/src/unique_id.rs +++ b/pgdog/src/unique_id.rs @@ -39,6 +39,10 @@ const COMPACT_MAX_NODE_ID: u64 = (1 << COMPACT_NODE_BITS) - 1; // 63 const COMPACT_MAX_SEQUENCE: u64 = (1 << COMPACT_SEQUENCE_BITS) - 1; // 63 const COMPACT_NODE_SHIFT: u8 = COMPACT_SEQUENCE_BITS as u8; // 6 const COMPACT_TIMESTAMP_SHIFT: u8 = (COMPACT_SEQUENCE_BITS + COMPACT_NODE_BITS) as u8; // 12 +const COMPACT_MAX_OFFSET: u64 = ((1u64 << 53) - 1) + - ((MAX_TIMESTAMP << COMPACT_TIMESTAMP_SHIFT) + | (COMPACT_MAX_NODE_ID << COMPACT_NODE_SHIFT) + | COMPACT_MAX_SEQUENCE); static UNIQUE_ID: OnceCell = OnceCell::new(); @@ -167,17 +171,23 @@ impl UniqueId { let min_id = config().config.general.unique_id_min; - if node_id > MAX_NODE_ID { - return Err(Error::NodeIdTooLarge(node_id)); - } - - // Compact (JS-safe) IDs only have 6 node bits. - if node_id > COMPACT_MAX_NODE_ID { - return Err(Error::CompactNodeIdTooLarge(node_id)); - } - - if min_id > MAX_OFFSET { - return Err(Error::OffsetTooLarge(min_id)); + match function { + UniqueIdFunction::Standard => { + if node_id > MAX_NODE_ID { + return Err(Error::NodeIdTooLarge(node_id)); + } + if min_id > MAX_OFFSET { + return Err(Error::OffsetTooLarge(min_id)); + } + } + UniqueIdFunction::Compact => { + if node_id > COMPACT_MAX_NODE_ID { + return Err(Error::CompactNodeIdTooLarge(node_id)); + } + if min_id > COMPACT_MAX_OFFSET { + return Err(Error::OffsetTooLarge(min_id)); + } + } } Ok(Self { @@ -440,4 +450,17 @@ mod test { let result = max_base_id + MAX_OFFSET; assert!(result <= i64::MAX as u64, "MAX_OFFSET would overflow i64"); } + + #[test] + fn test_compact_max_offset() { + const JS_MAX_SAFE_INTEGER: u64 = (1 << 53) - 1; + let max_base_id = (MAX_TIMESTAMP << COMPACT_TIMESTAMP_SHIFT) + | (COMPACT_MAX_NODE_ID << COMPACT_NODE_SHIFT) + | COMPACT_MAX_SEQUENCE; + let result = max_base_id + COMPACT_MAX_OFFSET; + assert!( + result <= JS_MAX_SAFE_INTEGER, + "COMPACT_MAX_OFFSET would overflow JS MAX_SAFE_INTEGER" + ); + } } From 5a6d12567ded65ca7940f4ed9b4e6b6150204387 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 13:23:38 -0700 Subject: [PATCH 07/10] schema --- .schema/pgdog.schema.json | 9 +++++++-- pgdog/src/unique_id.rs | 19 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 247eab443..46ba10b83 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -63,7 +63,7 @@ "log_connections": true, "log_disconnections": true, "log_format": "text", - "log_level": "info", + "log_level": "trace", "lsn_check_delay": 9223372036854775807, "lsn_check_interval": 5000, "lsn_check_timeout": 5000, @@ -780,7 +780,7 @@ "log_level": { "description": "Log filter directives using the same syntax as the `RUST_LOG` environment variable.\n\n_Default:_ `info`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#log_level", "type": "string", - "default": "info" + "default": "trace" }, "lsn_check_delay": { "description": "For how long to delay checking for replication delay.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#lsn_check_delay", @@ -1501,6 +1501,11 @@ "description": "Automatically rewrite the query and execute it.", "type": "string", "const": "rewrite" + }, + { + "description": "Rewrite only for omnisharded tables.", + "type": "string", + "const": "rewrite_omni" } ] }, diff --git a/pgdog/src/unique_id.rs b/pgdog/src/unique_id.rs index 51194f249..7f00c2fe4 100644 --- a/pgdog/src/unique_id.rs +++ b/pgdog/src/unique_id.rs @@ -444,11 +444,18 @@ mod test { #[test] fn test_max_offset() { - // Verify MAX_OFFSET calculation is correct let max_base_id = (MAX_TIMESTAMP << TIMESTAMP_SHIFT) | (MAX_NODE_ID << NODE_SHIFT) | MAX_SEQUENCE; let result = max_base_id + MAX_OFFSET; - assert!(result <= i64::MAX as u64, "MAX_OFFSET would overflow i64"); + assert_eq!( + result, + i64::MAX as u64, + "MAX_OFFSET should exactly reach i64::MAX" + ); + assert!( + max_base_id.checked_add(MAX_OFFSET + 1).unwrap() > i64::MAX as u64, + "MAX_OFFSET + 1 should overflow i64" + ); } #[test] @@ -458,9 +465,13 @@ mod test { | (COMPACT_MAX_NODE_ID << COMPACT_NODE_SHIFT) | COMPACT_MAX_SEQUENCE; let result = max_base_id + COMPACT_MAX_OFFSET; + assert_eq!( + result, JS_MAX_SAFE_INTEGER, + "COMPACT_MAX_OFFSET should exactly reach JS MAX_SAFE_INTEGER" + ); assert!( - result <= JS_MAX_SAFE_INTEGER, - "COMPACT_MAX_OFFSET would overflow JS MAX_SAFE_INTEGER" + max_base_id.checked_add(COMPACT_MAX_OFFSET + 1).unwrap() > JS_MAX_SAFE_INTEGER, + "COMPACT_MAX_OFFSET + 1 should overflow JS MAX_SAFE_INTEGER" ); } } From 78dcd6d3b433a87d92803b8e70995fd0e966ffb4 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 13:26:10 -0700 Subject: [PATCH 08/10] woops --- .schema/pgdog.schema.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 46ba10b83..756adf06f 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -63,7 +63,7 @@ "log_connections": true, "log_disconnections": true, "log_format": "text", - "log_level": "trace", + "log_level": "info", "lsn_check_delay": 9223372036854775807, "lsn_check_interval": 5000, "lsn_check_timeout": 5000, @@ -780,7 +780,7 @@ "log_level": { "description": "Log filter directives using the same syntax as the `RUST_LOG` environment variable.\n\n_Default:_ `info`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#log_level", "type": "string", - "default": "trace" + "default": "info" }, "lsn_check_delay": { "description": "For how long to delay checking for replication delay.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#lsn_check_delay", From fc4414741a92faed59deb476342bff573b4872f7 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 1 Apr 2026 13:35:47 -0700 Subject: [PATCH 09/10] rewrite default before early return --- .../router/parser/rewrite/statement/auto_id.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs index 4f4eb0702..c33c79cdb 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/auto_id.rs @@ -62,14 +62,6 @@ impl StatementRewrite<'_> { .copied() .collect(); - if missing_columns.is_empty() { - return Ok(()); - } - - if mode == RewriteMode::Error { - return Err(Error::MissingPrimaryKey); - } - let rewrite = mode == RewriteMode::Rewrite || mode == RewriteMode::RewriteOmni && !is_sharded; @@ -82,6 +74,14 @@ impl StatementRewrite<'_> { } } + if missing_columns.is_empty() { + return Ok(()); + } + + if mode == RewriteMode::Error { + return Err(Error::MissingPrimaryKey); + } + if rewrite { for column in missing_columns { self.inject_column_with_unique_id(column)?; From 9413f8f09b3e5a99ebb3a3fafaca728b56769c7a Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 2 Apr 2026 12:27:55 -0700 Subject: [PATCH 10/10] handle with identity columns --- pgdog/src/backend/schema/columns.sql | 2 +- pgdog/src/backend/schema/mod.rs | 9 ++++++++- pgdog/src/backend/schema/setup.sql | 14 +++++++++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pgdog/src/backend/schema/columns.sql b/pgdog/src/backend/schema/columns.sql index 2365185e3..22d63ba25 100644 --- a/pgdog/src/backend/schema/columns.sql +++ b/pgdog/src/backend/schema/columns.sql @@ -3,7 +3,7 @@ SELECT c.table_schema::text, c.table_name::text, c.column_name::text, - c.column_default::text, + COALESCE(c.column_default, CASE WHEN c.is_identity = 'YES' THEN 'generated ' || lower(c.identity_generation) || ' as identity' ELSE NULL END)::text AS column_default, (c.is_nullable != 'NO')::text AS is_nullable, c.data_type::text, c.ordinal_position::int, diff --git a/pgdog/src/backend/schema/mod.rs b/pgdog/src/backend/schema/mod.rs index 9b3479f1c..d21f4a3e1 100644 --- a/pgdog/src/backend/schema/mod.rs +++ b/pgdog/src/backend/schema/mod.rs @@ -138,11 +138,18 @@ impl Schema { server.addr(), ); + let supported_defaults = [ + "nextval", + "generated always as identity", + "generated by default as identity", + ]; + for table in tables { for column in table.columns().iter().filter(|column| { column.1.is_primary_key // Only primary keys. && matches!(column.1.data_type.as_str(), "bigint" | "int8") // Only BIGINT. - && column.1.column_default.contains("nextval") // Only the ones that rely on a sequence. + && supported_defaults.iter().any(|p| column.1.column_default.contains(p)) + // Only the ones that rely on a sequence. }) { info!( "[schema] creating sharded sequence for \"{}\".\"{}\".\"{}\"", diff --git a/pgdog/src/backend/schema/setup.sql b/pgdog/src/backend/schema/setup.sql index ed767f885..10ce69e54 100644 --- a/pgdog/src/backend/schema/setup.sql +++ b/pgdog/src/backend/schema/setup.sql @@ -268,11 +268,23 @@ BEGIN ); -- Create sequence. - EXECUTE format('CREATE SEQUENCE IF NOT EXISTS pgdog_shadow."%s"', shadow_seq_name); + EXECUTE format('CREATE SEQUENCE IF NOT EXISTS pgdog_shadow."%s" CACHE 100', shadow_seq_name); -- Make the sequence owned by the shadow table. EXECUTE format('ALTER SEQUENCE pgdog_shadow."%s" OWNED BY pgdog_shadow.%s.%s', shadow_seq_name, shadow_table_name, column_name); + -- Drop identity constraint if one exists, since we're replacing it with a custom default. + IF EXISTS ( + SELECT 1 + FROM information_schema.columns c + WHERE c.table_schema = install_shadow_table.schema_name + AND c.table_name = install_shadow_table.table_name + AND c.column_name = install_shadow_table.column_name + AND c.is_identity = 'YES' + ) THEN + EXECUTE format('ALTER TABLE "%s"."%s" ALTER COLUMN "%s" DROP IDENTITY', schema_name, table_name, column_name); + END IF; + -- Set it as the default for the target table, allowing automatic ID generation. EXECUTE format('ALTER TABLE "%s"."%s" ALTER COLUMN "%s" SET DEFAULT pgdog.next_id_seq(''pgdog_shadow.%s''::regclass, ''pgdog_shadow.%s'')', schema_name,