Skip to content

Commit 9935a95

Browse files
authored
fix: race in extended_anonymous prepared statements (#863)
- fix: anonymous prepared statements were self-evicting immediately upon insertion because they are all named `""`. - refactor: server connections were getting the prepared statement and pooler mode update on fast path only; this didn't affect anything materially, since the fast path was executed first, but created confusion
1 parent b97efe5 commit 9935a95

3 files changed

Lines changed: 137 additions & 6 deletions

File tree

integration/js/pg_tests/test/postgres_js.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,80 @@ describe("postgres.js sql.array()", function () {
450450
assert.strictEqual(rows.length, 2); // a and c
451451
});
452452
});
453+
454+
describe("postgres.js unsafe stress test (50k unique statements)", function () {
455+
this.timeout(300000);
456+
457+
before(async function () {
458+
await adminSet("prepared_statements", "extended_anonymous");
459+
// Warmup: ensure pool connections are established after databases::init()
460+
// recreates backend pools (same pattern as other test suites).
461+
await sql.unsafe("SELECT 1");
462+
});
463+
464+
after(async function () {
465+
await adminSet("prepared_statements", "extended");
466+
});
467+
468+
it("50k unique query texts with 25 rotating parameters", async function () {
469+
const TOTAL_QUERIES = 50000;
470+
const NUM_PARAMS = 25;
471+
const BATCH_SIZE = 100;
472+
473+
const params = Array.from({ length: NUM_PARAMS }, (_, i) => i * 7 + 1);
474+
475+
let completed = 0;
476+
const errors = [];
477+
478+
for (let batchStart = 0; batchStart < TOTAL_QUERIES; batchStart += BATCH_SIZE) {
479+
const batchEnd = Math.min(batchStart + BATCH_SIZE, TOTAL_QUERIES);
480+
const promises = [];
481+
482+
for (let i = batchStart; i < batchEnd; i++) {
483+
const paramVal = params[i % NUM_PARAMS];
484+
const queryText = `SELECT $1::int AS r_${i}`;
485+
486+
const p = sql
487+
.unsafe(queryText, [paramVal])
488+
.then((rows) => {
489+
assert.strictEqual(rows[0][`r_${i}`], paramVal);
490+
completed++;
491+
})
492+
.catch((err) => {
493+
errors.push({ i, err: err.message });
494+
});
495+
496+
promises.push(p);
497+
}
498+
499+
await Promise.all(promises);
500+
}
501+
502+
assert.strictEqual(
503+
errors.length,
504+
0,
505+
`${errors.length} failures, first 5: ${JSON.stringify(errors.slice(0, 5))}`,
506+
);
507+
assert.strictEqual(completed, TOTAL_QUERIES);
508+
509+
// Verify backend prepared statement evictions are happening.
510+
// With 50k unique statements, pool_size=10, and capacity=500,
511+
// each connection handles ~5k queries → ~4500 evictions each.
512+
const res = await fetch("http://localhost:9090");
513+
const metrics = await res.text();
514+
const evictions = metrics
515+
.split("\n")
516+
.filter(
517+
(l) =>
518+
l.startsWith("pgdog_total_prepared_evictions") &&
519+
l.includes('database="pgdog"') &&
520+
l.includes('user="pgdog"'),
521+
)
522+
.map((l) => parseInt(l.split(" ").pop(), 10))
523+
.reduce((a, b) => a + b, 0);
524+
assert.ok(
525+
evictions > 0,
526+
`expected prepared statement evictions, got ${evictions}`,
527+
);
528+
});
529+
});

pgdog/src/backend/pool/pool_impl.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,7 @@ impl Pool {
152152
self.comms().ready.notified().await;
153153
}
154154

155-
let (server, granted_at) = if let Some(mut server) = server {
156-
server
157-
.prepared_statements_mut()
158-
.set_capacity(self.inner.config.prepared_statements_limit);
159-
server.set_pooler_mode(self.inner.config.pooler_mode);
155+
let (mut server, granted_at) = if let Some(server) = server {
160156
(Guard::new(pool, server, granted_at), granted_at)
161157
} else {
162158
// Slow path, pool is empty, will create new connection
@@ -165,6 +161,11 @@ impl Pool {
165161
waiting.wait().await?
166162
};
167163

164+
server
165+
.prepared_statements_mut()
166+
.set_capacity(self.inner.config.prepared_statements_limit);
167+
server.set_pooler_mode(self.inner.config.pooler_mode);
168+
168169
match self
169170
.maybe_healthcheck(
170171
server,

pgdog/src/frontend/prepared_statements/mod.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl PreparedStatements {
7878
// Key already existed, only value changed.
7979
self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value));
8080
self.memory_used += str_mem(&name);
81-
self.global.write().decrement(&name);
81+
self.global.write().decrement(&old_value);
8282
} else {
8383
// New entry.
8484
self.memory_used += str_mem(key) + str_mem(&name);
@@ -273,4 +273,57 @@ mod test {
273273
0
274274
);
275275
}
276+
277+
/// Regression test: anonymous statements with different query texts
278+
/// must decrement the OLD global entry, not the new one.
279+
/// Previously, the new entry was immediately set to used=0 (evictable)
280+
/// while the old entry leaked at used=1 forever.
281+
#[test]
282+
fn test_anonymous_different_queries_decrement_old() {
283+
let mut statements = PreparedStatements::default();
284+
285+
// First anonymous Parse: "" → __pgdog_1, used: 1
286+
let mut parse1 = ProtocolMessage::from(Parse::new_anonymous("SELECT 1"));
287+
statements.maybe_rewrite(&mut parse1).unwrap();
288+
289+
let global = statements.global.read();
290+
let first = global.statements().values().next().unwrap();
291+
assert_eq!(first.used, 1);
292+
let first_name = first.name();
293+
drop(global);
294+
295+
// Second anonymous Parse with DIFFERENT query: "" → __pgdog_2
296+
// This replaces the local "" mapping.
297+
let mut parse2 = ProtocolMessage::from(Parse::new_anonymous("SELECT 2"));
298+
statements.maybe_rewrite(&mut parse2).unwrap();
299+
300+
let global = statements.global.read();
301+
assert_eq!(global.statements().len(), 2);
302+
303+
for (_, stmt) in global.statements() {
304+
if stmt.name() == first_name {
305+
// Old entry: should be decremented to 0 (no longer referenced).
306+
assert_eq!(stmt.used, 0, "old entry should be decremented");
307+
} else {
308+
// New entry: should stay at 1 (actively referenced).
309+
assert_eq!(stmt.used, 1, "new entry should remain at used=1");
310+
}
311+
}
312+
drop(global);
313+
314+
// Third anonymous Parse with yet another query.
315+
let mut parse3 = ProtocolMessage::from(Parse::new_anonymous("SELECT 3"));
316+
statements.maybe_rewrite(&mut parse3).unwrap();
317+
318+
let global = statements.global.read();
319+
assert_eq!(global.statements().len(), 3);
320+
321+
// Exactly one entry should have used=1 (the latest).
322+
let active = global.statements().values().filter(|s| s.used == 1).count();
323+
assert_eq!(active, 1, "exactly one statement should be active");
324+
325+
// The other two should have used=0.
326+
let unused = global.statements().values().filter(|s| s.used == 0).count();
327+
assert_eq!(unused, 2, "old statements should be unused");
328+
}
276329
}

0 commit comments

Comments
 (0)