Skip to content

Commit 23d3571

Browse files
committed
feat: Extend \dt psql command output with shard metadata (#709)
Intercept \dt command, and add a `Shard` column to the output. Add a flag to `Route` that indicates if \dt is being executed so the Shard column is conditionally applied. Add `shard_map` HashMap to `Route` as well that stores tables with their corresponding shard. Introduce `forward_with_shard` function in backend/pool/connection/binding.rs that exposes the shard_map property to be streamed in the query engine. Add engine logic to populate the new column correctly and handle tables sharded across multiple databases Ex. output: List of tables Schema | Name | Type | Owner | Shard --------+-----------+-------+--------+--------- public | only_on_0 | table | ubuntu | 0 public | only_on_1 | table | ubuntu | 1 public | only_on_2 | table | ubuntu | 2 public | users | table | ubuntu | 0, 1, 2 Signed-off-by: Aditya Gollamudi <adigollamudi@gmail.com>
1 parent e717983 commit 23d3571

7 files changed

Lines changed: 132 additions & 5 deletions

File tree

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
33
use crate::{
44
frontend::{client::query_engine::TwoPcPhase, ClientRequest},
5-
net::{parameter::Parameters, BackendKeyData, ProtocolMessage, Query},
5+
net::{parameter::Parameters, BackendKeyData, Message, ProtocolMessage, Query},
66
state::State,
77
};
88

99
use futures::future::join_all;
10+
use std::collections::HashMap;
1011

1112
use super::*;
1213

@@ -53,6 +54,13 @@ impl Binding {
5354
self.disconnect();
5455
}
5556

57+
pub fn forward_with_shard(&self) -> Option<HashMap<String, Vec<usize>>> {
58+
match self {
59+
Binding::MultiShard(_shards, state) => state.table_shard_map(),
60+
_ => None,
61+
}
62+
}
63+
5664
/// Are we connected to a backend?
5765
pub fn connected(&self) -> bool {
5866
match self {
@@ -91,13 +99,25 @@ impl Binding {
9199
return Ok(message);
92100
}
93101
let mut read = false;
94-
for server in shards.iter_mut() {
102+
103+
for (shard, server) in shards.iter_mut().enumerate() {
95104
if !server.has_more_messages() {
96105
continue;
97106
}
98107

99108
let message = server.read().await?;
100109

110+
if state.display_table() {
111+
if let Some(table_name) = message.table_name_from_dt().unwrap() {
112+
let mut map: HashMap<String, Vec<usize>> =
113+
state.table_shard_map().unwrap_or_default();
114+
map.entry(table_name.clone())
115+
.or_insert_with(Vec::new)
116+
.push(shard);
117+
state.set_table_shard_map(Some(map));
118+
}
119+
}
120+
101121
read = true;
102122
if let Some(message) = state.forward(message)? {
103123
return Ok(message);

pgdog/src/backend/pool/connection/multi_shard/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Multi-shard connection state.
22
33
use context::Context;
4+
use std::collections::HashMap;
45

56
use crate::{
67
frontend::{router::Route, PreparedStatements},
@@ -345,4 +346,16 @@ impl MultiShard {
345346
}
346347
}
347348
}
349+
350+
pub fn display_table(&self) -> bool {
351+
self.route.display_table()
352+
}
353+
354+
pub fn set_table_shard_map(&mut self, map: Option<HashMap<String, Vec<usize>>>) {
355+
self.route.set_table_shard_map(map);
356+
}
357+
358+
pub fn table_shard_map(&self) -> Option<HashMap<String, Vec<usize>>> {
359+
self.route.table_shard_map()
360+
}
348361
}

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
state::State,
1111
};
1212

13+
use std::collections::HashSet;
1314
use tracing::debug;
1415

1516
pub mod connect;
@@ -78,6 +79,7 @@ pub struct QueryEngine {
7879
notify_buffer: NotifyBuffer,
7980
pending_explain: Option<ExplainResponseState>,
8081
hooks: QueryEngineHooks,
82+
seen_tables: HashSet<String>,
8183
}
8284

8385
impl QueryEngine {
@@ -105,6 +107,7 @@ impl QueryEngine {
105107
pending_explain: None,
106108
begin_stmt: None,
107109
router: Router::default(),
110+
seen_tables: HashSet::new(),
108111
})
109112
}
110113

pgdog/src/frontend/client/query_engine/query.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
router::parser::{explain_trace::ExplainTrace, rewrite::statement::plan::RewriteResult},
88
},
99
net::{
10-
DataRow, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery,
10+
DataRow, Field, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery,
1111
RowDescription, ToBytes, TransactionState,
1212
},
1313
state::State,
@@ -36,7 +36,7 @@ impl QueryEngine {
3636
// We need to run a query now.
3737
if context.in_transaction() {
3838
// Connect to one shard if not sharded or to all shards
39-
// for a cross-shard tranasction.
39+
// for a cross-shard transaction.
4040
if !self.connect_transaction(context).await? {
4141
return Ok(());
4242
}
@@ -123,8 +123,23 @@ impl QueryEngine {
123123
) -> Result<(), Error> {
124124
self.streaming = message.streaming();
125125

126+
let should_rewrite_for_display_table =
127+
if let Some(route) = context.client_request.route.as_ref() {
128+
route.display_table()
129+
} else {
130+
false
131+
};
132+
126133
let code = message.code();
127134
let payload = if code == 'T' {
135+
if should_rewrite_for_display_table {
136+
let mut fields = RowDescription::from_bytes(message.payload())
137+
.unwrap()
138+
.fields
139+
.to_vec();
140+
fields.push(Field::text("Shard"));
141+
message = RowDescription::new(&fields).message()?;
142+
}
128143
Some(message.payload())
129144
} else {
130145
None
@@ -152,6 +167,38 @@ impl QueryEngine {
152167
self.pending_explain = None;
153168
}
154169

170+
if code == 'D' {
171+
if should_rewrite_for_display_table {
172+
let mut dr = DataRow::from_bytes(message.payload()).unwrap();
173+
let col = dr.column(1).unwrap();
174+
175+
let shard_map = self.backend.forward_with_shard();
176+
let table_lookup = std::str::from_utf8(&col).unwrap();
177+
178+
if let Some(map) = shard_map {
179+
if self.seen_tables.contains(table_lookup) {
180+
return Ok(());
181+
}
182+
183+
self.seen_tables.insert(table_lookup.to_string());
184+
185+
let mut new_col = String::new();
186+
for (i, val) in map[table_lookup].iter().enumerate() {
187+
if i > 0 {
188+
new_col.push_str(", ")
189+
}
190+
new_col.push_str(&val.to_string());
191+
}
192+
dr.add(new_col);
193+
} else {
194+
dr.add(None);
195+
}
196+
197+
message = dr.message()?;
198+
Some(message.payload());
199+
}
200+
}
201+
155202
// Messages that we need to send to the client immediately.
156203
// ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B)
157204
let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A')

pgdog/src/frontend/router/parser/query/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ impl QueryParser {
108108
Command::default()
109109
};
110110

111+
// Check if we are executing \dt command
112+
if let Command::Query(route) = &mut command {
113+
let q = context.query().unwrap();
114+
if q.contains("pg_catalog.pg_class")
115+
&& q.contains("pg_catalog.pg_namespace")
116+
&& q.contains("relkind")
117+
&& q.contains("pg_toast")
118+
{
119+
route.set_display_table(true);
120+
}
121+
}
122+
111123
if let Command::Query(route) = &mut command {
112124
if route.is_cross_shard() && context.shards == 1 {
113125
context

pgdog/src/frontend/router/parser/route.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Display, ops::Deref};
1+
use std::{collections::HashMap, fmt::Display, ops::Deref};
22

33
use lazy_static::lazy_static;
44

@@ -90,6 +90,8 @@ pub struct Route {
9090
rollback_savepoint: bool,
9191
search_path_driven: bool,
9292
schema_changed: bool,
93+
display_table: bool,
94+
table_shard_map: Option<HashMap<String, Vec<usize>>>,
9395
}
9496

9597
impl Display for Route {
@@ -326,6 +328,25 @@ impl Route {
326328
ShardSource::Table(TableReason::Omni) | ShardSource::RoundRobin(RoundRobinReason::Omni)
327329
)
328330
}
331+
pub fn set_display_table(&mut self, v: bool) {
332+
self.display_table = v;
333+
}
334+
335+
pub fn display_table(&self) -> bool {
336+
self.display_table
337+
}
338+
339+
pub fn table_shard_map(&self) -> Option<HashMap<String, Vec<usize>>> {
340+
if self.table_shard_map == None {
341+
Some(HashMap::new())
342+
} else {
343+
self.table_shard_map.clone()
344+
}
345+
}
346+
347+
pub fn set_table_shard_map(&mut self, map: Option<HashMap<String, Vec<usize>>>) {
348+
self.table_shard_map = map;
349+
}
329350
}
330351

331352
/// Shard source.

pgdog/src/net/messages/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,17 @@ impl Message {
256256
pub fn transaction_error(&self) -> bool {
257257
self.code() == 'Z' && self.payload[5] as char == 'E'
258258
}
259+
260+
pub fn table_name_from_dt(&self) -> Result<Option<String>, Error> {
261+
if self.code() != 'D' {
262+
return Ok(None);
263+
}
264+
let byte_name = DataRow::from_bytes(self.payload()).unwrap().column(1);
265+
266+
let table_name = std::str::from_utf8(&byte_name.unwrap())?.to_string();
267+
268+
return Ok(Some(table_name));
269+
}
259270
}
260271

261272
/// Check that the message we received is what we expected.

0 commit comments

Comments
 (0)