Skip to content

Commit 7256483

Browse files
committed
wip: passthrough id generation
1 parent cbdb18d commit 7256483

4 files changed

Lines changed: 471 additions & 17 deletions

File tree

pgdog-config/src/rewrite.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ pub enum RewriteMode {
1919
Error,
2020
/// Automatically rewrite the query and execute it.
2121
Rewrite,
22+
/// Rewrite only for omnisharded tables.
23+
RewriteOmni,
2224
}
2325

2426
impl fmt::Display for RewriteMode {
@@ -27,6 +29,7 @@ impl fmt::Display for RewriteMode {
2729
RewriteMode::Error => "error",
2830
RewriteMode::Rewrite => "rewrite",
2931
RewriteMode::Ignore => "ignore",
32+
RewriteMode::RewriteOmni => "rewrite_omni",
3033
};
3134
f.write_str(value)
3235
}
@@ -40,6 +43,7 @@ impl FromStr for RewriteMode {
4043
"error" => Ok(RewriteMode::Error),
4144
"rewrite" => Ok(RewriteMode::Rewrite),
4245
"ignore" => Ok(RewriteMode::Ignore),
46+
"rewrite_omni" => Ok(RewriteMode::RewriteOmni),
4347
_ => Err(()),
4448
}
4549
}

pgdog/src/backend/schema/mod.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ pub mod sync;
66
pub use pgdog_stats::{
77
Relation as StatsRelation, Relations as StatsRelations, Schema as StatsSchema, SchemaInner,
88
};
9+
use pg_query::{
10+
protobuf::{
11+
ColumnDef, ConstrType, Constraint, CreateStmt, OnCommitAction, RangeVar,
12+
String as PgString, TypeName,
13+
},
14+
Node, NodeEnum,
15+
};
916
use serde::{Deserialize, Serialize};
1017
use std::ops::DerefMut;
1118
use std::{collections::HashMap, ops::Deref};
@@ -223,6 +230,142 @@ impl Schema {
223230
pub fn search_path(&self) -> &[String] {
224231
&self.inner.search_path
225232
}
233+
234+
/// Generate a `CREATE TABLE` statement for a relation in the schema.
235+
pub fn to_sql(&self, schema: &str, table: &str) -> Option<String> {
236+
let relation = self.inner.get(schema, table)?;
237+
238+
let mut table_elts = Vec::with_capacity(relation.columns.len());
239+
let mut pk_columns = Vec::new();
240+
241+
for column in relation.columns.values() {
242+
let mut constraints = Vec::new();
243+
244+
if !column.is_nullable {
245+
constraints.push(Node {
246+
node: Some(NodeEnum::Constraint(Box::new(Constraint {
247+
contype: ConstrType::ConstrNotnull.into(),
248+
..Default::default()
249+
}))),
250+
});
251+
}
252+
253+
if !column.column_default.is_empty() {
254+
if let Some(expr) = Self::parse_default_expr(&column.column_default) {
255+
constraints.push(Node {
256+
node: Some(NodeEnum::Constraint(Box::new(Constraint {
257+
contype: ConstrType::ConstrDefault.into(),
258+
raw_expr: Some(Box::new(expr)),
259+
..Default::default()
260+
}))),
261+
});
262+
}
263+
}
264+
265+
if column.is_primary_key {
266+
pk_columns.push(Node {
267+
node: Some(NodeEnum::String(PgString {
268+
sval: column.column_name.clone(),
269+
})),
270+
});
271+
}
272+
273+
table_elts.push(Node {
274+
node: Some(NodeEnum::ColumnDef(Box::new(ColumnDef {
275+
colname: column.column_name.clone(),
276+
type_name: Some(Self::pg_type_name(&column.data_type)),
277+
is_local: true,
278+
constraints,
279+
..Default::default()
280+
}))),
281+
});
282+
}
283+
284+
if !pk_columns.is_empty() {
285+
table_elts.push(Node {
286+
node: Some(NodeEnum::Constraint(Box::new(Constraint {
287+
contype: ConstrType::ConstrPrimary.into(),
288+
keys: pk_columns,
289+
..Default::default()
290+
}))),
291+
});
292+
}
293+
294+
let create_stmt = CreateStmt {
295+
relation: Some(RangeVar {
296+
schemaname: schema.to_owned(),
297+
relname: table.to_owned(),
298+
inh: true,
299+
relpersistence: "p".to_owned(),
300+
..Default::default()
301+
}),
302+
table_elts,
303+
oncommit: OnCommitAction::OncommitNoop.into(),
304+
..Default::default()
305+
};
306+
307+
NodeEnum::CreateStmt(create_stmt).deparse().ok()
308+
}
309+
310+
/// Parse a column default expression into an AST node.
311+
fn parse_default_expr(default: &str) -> Option<Node> {
312+
let parsed = pg_query::parse(&format!("SELECT {default}")).ok()?;
313+
let stmt = parsed.protobuf.stmts.first()?;
314+
let node = stmt.stmt.as_ref()?;
315+
let NodeEnum::SelectStmt(ref select) = node.node.as_ref()? else {
316+
return None;
317+
};
318+
let target = select.target_list.first()?;
319+
let NodeEnum::ResTarget(ref res) = target.node.as_ref()? else {
320+
return None;
321+
};
322+
res.val.as_ref().map(|v| (**v).clone())
323+
}
324+
325+
/// Map an information_schema data type name to a pg_catalog [`TypeName`].
326+
fn pg_type_name(data_type: &str) -> TypeName {
327+
// Types that deparse correctly with pg_catalog qualification.
328+
let pg_catalog_name = match data_type {
329+
"bigint" => Some("int8"),
330+
"integer" => Some("int4"),
331+
"smallint" => Some("int2"),
332+
"boolean" => Some("bool"),
333+
"character varying" => Some("varchar"),
334+
"double precision" => Some("float8"),
335+
"real" => Some("float4"),
336+
"timestamp without time zone" => Some("timestamp"),
337+
"timestamp with time zone" => Some("timestamptz"),
338+
"character" => Some("bpchar"),
339+
_ => None,
340+
};
341+
342+
let names = if let Some(pg_name) = pg_catalog_name {
343+
vec![
344+
Node {
345+
node: Some(NodeEnum::String(PgString {
346+
sval: "pg_catalog".to_owned(),
347+
})),
348+
},
349+
Node {
350+
node: Some(NodeEnum::String(PgString {
351+
sval: pg_name.to_owned(),
352+
})),
353+
},
354+
]
355+
} else {
356+
vec![Node {
357+
node: Some(NodeEnum::String(PgString {
358+
sval: data_type.to_owned(),
359+
})),
360+
}]
361+
};
362+
363+
TypeName {
364+
names,
365+
typemod: -1,
366+
..Default::default()
367+
}
368+
}
226369
}
227370

228371
#[cfg(test)]
@@ -274,6 +417,87 @@ mod test {
274417
assert!(debug.first().unwrap().contains("PgDog Debug"));
275418
}
276419

420+
#[tokio::test]
421+
async fn test_install_next_id_seq() {
422+
use crate::backend::server::test::test_server;
423+
424+
let mut conn = test_server().await;
425+
426+
// Use a dedicated schema to avoid conflicts with test_schema
427+
// which drops the pgdog schema.
428+
conn.execute_checked("CREATE SCHEMA IF NOT EXISTS pgdog_test")
429+
.await
430+
.unwrap();
431+
432+
// Install pgdog schema (CREATE OR REPLACE is idempotent).
433+
Schema::setup(&mut conn).await.unwrap();
434+
435+
// Ensure shard config exists.
436+
let count = conn
437+
.fetch_all::<i64>("SELECT COUNT(*) FROM pgdog.config")
438+
.await
439+
.unwrap();
440+
if count.first().copied() == Some(0) {
441+
conn.execute_checked(
442+
"INSERT INTO pgdog.config (shard, shards) VALUES (0, 1)",
443+
)
444+
.await
445+
.unwrap();
446+
}
447+
448+
// Clean up from previous runs and create a test table with BIGSERIAL primary key.
449+
conn.execute_checked("DROP TABLE IF EXISTS pgdog_test.ids")
450+
.await
451+
.unwrap();
452+
conn.execute_checked(
453+
"CREATE TABLE pgdog_test.ids (id BIGSERIAL PRIMARY KEY, value TEXT)",
454+
)
455+
.await
456+
.unwrap();
457+
458+
// Install the sharded sequence via install_next_id_seq.
459+
let result = conn
460+
.fetch_all::<String>(
461+
"SELECT pgdog.install_next_id_seq('pgdog_test', 'ids', 'id')",
462+
)
463+
.await
464+
.unwrap();
465+
assert!(
466+
result.first().unwrap().contains("installed"),
467+
"{}",
468+
result.first().unwrap()
469+
);
470+
471+
// Insert rows and collect generated IDs.
472+
conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('a')")
473+
.await
474+
.unwrap();
475+
conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('b')")
476+
.await
477+
.unwrap();
478+
conn.execute_checked("INSERT INTO pgdog_test.ids (value) VALUES ('c')")
479+
.await
480+
.unwrap();
481+
482+
let ids = conn
483+
.fetch_all::<i64>("SELECT id FROM pgdog_test.ids ORDER BY id")
484+
.await
485+
.unwrap();
486+
487+
assert_eq!(ids.len(), 3);
488+
489+
// All IDs should be unique.
490+
let mut unique = ids.clone();
491+
unique.sort();
492+
unique.dedup();
493+
assert_eq!(unique.len(), 3, "IDs are not unique: {:?}", ids);
494+
495+
// Clean up.
496+
conn.execute_checked("DROP SCHEMA pgdog_test CASCADE")
497+
.await
498+
.unwrap();
499+
}
500+
277501
#[test]
278502
fn test_resolve_search_path_default() {
279503
let schema = Schema::from_parts(vec!["$user".into(), "public".into()], HashMap::new());
@@ -415,4 +639,50 @@ mod test {
415639
assert!(result.is_some());
416640
assert_eq!(result.unwrap().schema(), "custom");
417641
}
642+
643+
#[test]
644+
fn test_to_sql() {
645+
use crate::backend::schema::columns::Column;
646+
647+
fn col(name: &str, table: &str, data_type: &str, ordinal: i32, pk: bool, nullable: bool) -> Column {
648+
pgdog_stats::Column {
649+
table_catalog: String::new(),
650+
table_schema: "public".into(),
651+
table_name: table.into(),
652+
column_name: name.into(),
653+
column_default: String::new(),
654+
is_nullable: nullable,
655+
data_type: data_type.into(),
656+
ordinal_position: ordinal,
657+
is_primary_key: pk,
658+
foreign_keys: vec![],
659+
}
660+
.into()
661+
}
662+
663+
let columns = IndexMap::from([
664+
("id".to_owned(), col("id", "users", "bigint", 1, true, false)),
665+
("name".to_owned(), col("name", "users", "text", 2, false, false)),
666+
("email".to_owned(), col("email", "users", "character varying", 3, false, true)),
667+
]);
668+
669+
let relations: HashMap<(String, String), Relation> =
670+
HashMap::from([(("public".into(), "users".into()), Relation::test_table("public", "users", columns))]);
671+
let schema = Schema::from_parts(vec!["public".into()], relations);
672+
673+
let sql = schema.to_sql("public", "users").unwrap();
674+
assert!(sql.contains("CREATE TABLE"), "{sql}");
675+
assert!(sql.contains("public"), "{sql}");
676+
assert!(sql.contains("users"), "{sql}");
677+
assert!(sql.contains("id"), "{sql}");
678+
assert!(sql.contains("name"), "{sql}");
679+
assert!(sql.contains("email"), "{sql}");
680+
assert!(sql.contains("PRIMARY KEY"), "{sql}");
681+
}
682+
683+
#[test]
684+
fn test_to_sql_not_found() {
685+
let schema = Schema::from_parts(vec!["public".into()], HashMap::new());
686+
assert!(schema.to_sql("public", "nonexistent").is_none());
687+
}
418688
}

0 commit comments

Comments
 (0)