Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .schema/pgdog.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
"tls_verify": "prefer",
"two_phase_commit": false,
"two_phase_commit_auto": null,
"unique_id_function": "standard",
"unique_id_min": 0,
"workers": 2
}
Expand Down Expand Up @@ -1025,6 +1026,11 @@
],
"default": null
},
"unique_id_function": {
"description": "Unique ID generation function.",
"$ref": "#/$defs/UniqueIdFunction",
"default": "standard"
},
"unique_id_min": {
"description": "Minimum ID for unique ID generator.",
"type": "integer",
Expand Down Expand Up @@ -1495,6 +1501,11 @@
"description": "Automatically rewrite the query and execute it.",
"type": "string",
"const": "rewrite"
},
{
"description": "Rewrite only for omnisharded tables.",
"type": "string",
"const": "rewrite_omni"
}
]
},
Expand Down Expand Up @@ -1817,6 +1828,20 @@
}
]
},
"UniqueIdFunction": {
"oneOf": [
{
"description": "Standard 64-bit function using the entire 64-bit range.",
"type": "string",
"const": "standard"
},
{
"description": "Compact function using the leftest 53-bit range, making it\nJavaScript-safe, so you can pass it as an integer directly\nto the frontend apps.\n\nThe year is 2026 and JavaScript continues to be a pain in the ass.",
"type": "string",
"const": "compact"
}
]
},
"Vector": {
"type": "object",
"properties": {
Expand Down
2 changes: 1 addition & 1 deletion integration/copy_data/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog2
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog
-- DROP SCHEMA IF EXISTS copy_data CASCADE;
DROP SCHEMA IF EXISTS copy_data CASCADE;
SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;
\i setup.sql
8 changes: 8 additions & 0 deletions integration/copy_data/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,13 @@ database = "destination"
column = "tenant_id"
data_type = "bigint"

[[sharded_tables]]
database = "source"
column = "tenant_id"
data_type = "bigint"

[admin]
password = "pgdog"

[rewrite]
primary_key = "rewrite_omni"
6 changes: 6 additions & 0 deletions integration/copy_data/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ CREATE TABLE copy_data.with_identity(
tenant_id BIGINT NOT NULL
);

CREATE TABLE copy_data.settings (
id BIGSERIAL PRIMARY KEY,
setting_name TEXT NOT NULL UNIQUE,
setting_value TEXT NOT NULL
);

DROP PUBLICATION IF EXISTS pgdog;
CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data;

Expand Down
2 changes: 1 addition & 1 deletion integration/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ query_parser_engine = "pg_query_raw"
system_catalogs = "omnisharded_sticky"
reload_schema_on_ddl = false
#idle_healthcheck_delay = 50000000

unique_id_function = "standard"

[memory]
net_buffer = 8096
Expand Down
12 changes: 11 additions & 1 deletion integration/rust/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,19 @@ pub async fn connections_sqlx() -> Vec<Pool<Postgres>> {
}

pub async fn connection_sqlx_direct() -> Pool<Postgres> {
connection_sqlx_direct_db("pgdog").await
}

pub async fn connection_sqlx_direct_db(name: &str) -> Pool<Postgres> {
PgPoolOptions::new()
.max_connections(1)
.connect("postgres://pgdog:pgdog@127.0.0.1:5432/pgdog?application_name=sqlx_direct")
.connect(
format!(
"postgres://pgdog:pgdog@127.0.0.1:5432/{}?application_name=sqlx_direct",
name
)
.as_str(),
)
.await
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions integration/rust/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod prepared;
pub mod reload;
pub mod reset;
pub mod rewrite;
pub mod rewrite_omni;
pub mod savepoint;
pub mod set_in_transaction;
pub mod set_sharding_key;
Expand Down
131 changes: 131 additions & 0 deletions integration/rust/tests/integration/rewrite_omni.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use rust::setup::*;
use sqlx::Executor;

#[tokio::test]
async fn test_omni_only_pk_rewrite() {
let shard_0 = connection_sqlx_direct_db("shard_0").await;
let shard_1 = connection_sqlx_direct_db("shard_1").await;

for (shard, pool) in [&shard_0, &shard_1].iter().enumerate() {
pool.execute(
"DROP TABLE IF EXISTS public.test_omni_rewrite_pk_omni, test_omni_rewrite_pk_sharded CASCADE",
)
.await
.unwrap();

pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_omni(id BIGSERIAL PRIMARY KEY, value TEXT NOT NULL)").await.unwrap();
pool.execute("CREATE TABLE IF NOT EXISTS public.test_omni_rewrite_pk_sharded(id BIGSERIAL PRIMARY KEY, customer_id BIGINT NOT NULL, value TEXT NOT NULL)").await.unwrap();

pool.execute(
"SELECT pgdog.install_shadow_table('public', 'test_omni_rewrite_pk_sharded', 'id')",
)
.await
.unwrap();

// Configure sharding.
let mut t = pool.begin().await.unwrap();
t.execute("DELETE FROM pgdog.config").await.unwrap();
t.execute(
format!(
"INSERT INTO pgdog.config (shard, shards) VALUES ({}, 2)",
shard
)
.as_str(),
)
.await
.unwrap();
t.commit().await.unwrap();
}

let sharded = connections_sqlx().await.pop().unwrap();
let admin = admin_sqlx().await;

// This will reload the schema as well.
admin
.execute("SET rewrite_primary_key TO 'rewrite_omni'")
.await
.unwrap();

let starting_id: (i64,) = sqlx::query_as("SELECT pgdog.unique_id()")
.fetch_one(&sharded)
.await
.unwrap();

for run in 0..25 {
let omni_id: (i64,) = sqlx::query_as(
"INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id",
)
.bind(format!("test_{}", run))
.fetch_one(&sharded)
.await
.unwrap();

assert!(
omni_id.0 > starting_id.0,
"omni ID should be unique_id, but got {}",
omni_id.0
);

let sharded_id: (i64,) = sqlx::query_as(
"INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id",
)
.bind(run as i64)
.bind(format!("test_{}", run))
.fetch_one(&sharded)
.await
.unwrap();

assert!(
sharded_id.0 < omni_id.0,
"sharded ID should not be unique_id, but got {}",
sharded_id.0
);
}

sharded.close().await;
let sharded = connections_sqlx().await.pop().unwrap();

// Re-enable sharded unique ID.
admin
.execute("SET rewrite_primary_key TO 'rewrite'")
.await
.unwrap();

// The rewrite is cached in prepared statements
// and the query cache, so we need to be careful to rest it
// _after_ the client disconnected. In production, changing this setting
// definitely requires a restart.
admin.execute("RESET QUERY_CACHE").await.unwrap();
admin.execute("RESET PREPARED").await.unwrap();

for run in 25..50 {
let omni_id: (i64,) = sqlx::query_as(
"INSERT INTO public.test_omni_rewrite_pk_omni (value) VALUES ($1) RETURNING id",
)
.bind(format!("test_{}", run))
.fetch_one(&sharded)
.await
.unwrap();

assert!(
omni_id.0 > starting_id.0,
"omni ID should be unique_id, but got {}",
omni_id.0
);

let sharded_id: (i64,) = sqlx::query_as(
"INSERT INTO public.test_omni_rewrite_pk_sharded (customer_id, value) VALUES ($1, $2) RETURNING id",
)
.bind(run as i64)
.bind(format!("test_{}", run))
.fetch_one(&sharded)
.await
.unwrap();

assert!(
sharded_id.0 > omni_id.0,
"sharded ID should be unique_id, but got {}",
sharded_id.0
);
}
}
25 changes: 19 additions & 6 deletions integration/schema_sync/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,25 @@ EOF

diff source.sql destination.sql > diff.txt || true

# Extract column name and type from diff lines, ignoring everything else
# This normalizes across different PG versions and constraint syntaxes
ACTUAL_CONVERSIONS=$(grep '^[<>]' diff.txt | \
grep -E '\b(integer|bigint)\b' | \
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+(integer|bigint).*/\1 \2/' | \
sort -u)
# Extract integer -> bigint conversions from the diff
# 1. Get removed (< integer) and added (> bigint) column lines
# 2. Only keep columns that appear as integer in source AND bigint in destination
REMOVED_INT=$(grep '^<' diff.txt | \
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+integer\b.*/\1/' | \
grep -E '^[a-z_]+$' | sort -u)

ADDED_BIGINT=$(grep '^>' diff.txt | \
sed -E 's/.*[[:space:]]([a-z_]+)[[:space:]]+bigint\b.*/\1/' | \
grep -E '^[a-z_]+$' | sort -u)

# Columns that changed from integer to bigint
CONVERTED=$(comm -12 <(echo "$REMOVED_INT") <(echo "$ADDED_BIGINT"))

# Build the expected format: column_name integer \n column_name bigint
ACTUAL_CONVERSIONS=$(echo "$CONVERTED" | while read col; do
echo "$col integer"
echo "$col bigint"
done | sort -u)

EXPECTED_SORTED=$(echo "$EXPECTED_CONVERSIONS" | sort -u)

Expand Down
4 changes: 4 additions & 0 deletions integration/schema_sync/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ database_name = "pgdog1"
name = "destination"
host = "127.0.0.1"
database_name = "pgdog2"

[[sharded_tables]]
database = "destination"
column = "user_id"
10 changes: 10 additions & 0 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::str::FromStr;
use std::time::Duration;

use crate::pooling::ConnectionRecovery;
use crate::UniqueIdFunction;
use crate::{
CopyFormat, CutoverTimeoutAction, LoadSchema, QueryParserEngine, QueryParserLevel,
SystemCatalogsBehavior,
Expand Down Expand Up @@ -545,6 +546,10 @@ pub struct General {
#[serde(default)]
pub unique_id_min: u64,

/// Unique ID generation function.
#[serde(default)]
pub unique_id_function: UniqueIdFunction,

/// Changes how system catalog tables (like `pg_database`, `pg_class`, etc.) are treated by the query router.
///
/// _Default:_ `omnisharded_sticky`
Expand Down Expand Up @@ -718,6 +723,7 @@ impl Default for General {
cutover_timeout: Self::cutover_timeout(),
cutover_timeout_action: Self::cutover_timeout_action(),
cutover_save_config: bool::default(),
unique_id_function: Self::unique_id_function(),
}
}
}
Expand Down Expand Up @@ -817,6 +823,10 @@ impl General {
Self::env_or_default("PGDOG_BAN_REPLICA_LAG_BYTES", i64::MAX as u64)
}

fn unique_id_function() -> UniqueIdFunction {
Self::env_enum_or_default("PGDOG_UNIQUE_ID_FUNCTION")
}

fn cutover_replication_lag_threshold() -> u64 {
Self::env_or_default("PGDOG_CUTOVER_REPLICATION_LAG_THRESHOLD", 0)
// 0 bytes
Expand Down
6 changes: 5 additions & 1 deletion pgdog-config/src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::str::FromStr;
#[derive(
Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum RewriteMode {
/// Forward the query unchanged.
Expand All @@ -19,6 +19,8 @@ pub enum RewriteMode {
Error,
/// Automatically rewrite the query and execute it.
Rewrite,
/// Rewrite only for omnisharded tables.
RewriteOmni,
}

impl fmt::Display for RewriteMode {
Expand All @@ -27,6 +29,7 @@ impl fmt::Display for RewriteMode {
RewriteMode::Error => "error",
RewriteMode::Rewrite => "rewrite",
RewriteMode::Ignore => "ignore",
RewriteMode::RewriteOmni => "rewrite_omni",
};
f.write_str(value)
}
Expand All @@ -40,6 +43,7 @@ impl FromStr for RewriteMode {
"error" => Ok(RewriteMode::Error),
"rewrite" => Ok(RewriteMode::Rewrite),
"ignore" => Ok(RewriteMode::Ignore),
"rewrite_omni" => Ok(RewriteMode::RewriteOmni),
_ => Err(()),
}
}
Expand Down
Loading
Loading