Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions integration/js/pg_tests/test/postgres_js.js
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,80 @@ describe("postgres.js sql.array()", function () {
assert.strictEqual(rows.length, 2); // a and c
});
});

describe("postgres.js unsafe stress test (50k unique statements)", function () {
this.timeout(300000);

before(async function () {
await adminSet("prepared_statements", "extended_anonymous");
// Warmup: ensure pool connections are established after databases::init()
// recreates backend pools (same pattern as other test suites).
await sql.unsafe("SELECT 1");
});

after(async function () {
await adminSet("prepared_statements", "extended");
});

it("50k unique query texts with 25 rotating parameters", async function () {
const TOTAL_QUERIES = 50000;
const NUM_PARAMS = 25;
const BATCH_SIZE = 100;

const params = Array.from({ length: NUM_PARAMS }, (_, i) => i * 7 + 1);

let completed = 0;
const errors = [];

for (let batchStart = 0; batchStart < TOTAL_QUERIES; batchStart += BATCH_SIZE) {
const batchEnd = Math.min(batchStart + BATCH_SIZE, TOTAL_QUERIES);
const promises = [];

for (let i = batchStart; i < batchEnd; i++) {
const paramVal = params[i % NUM_PARAMS];
const queryText = `SELECT $1::int AS r_${i}`;

const p = sql
.unsafe(queryText, [paramVal])
.then((rows) => {
assert.strictEqual(rows[0][`r_${i}`], paramVal);
completed++;
})
.catch((err) => {
errors.push({ i, err: err.message });
});

promises.push(p);
}

await Promise.all(promises);
}

assert.strictEqual(
errors.length,
0,
`${errors.length} failures, first 5: ${JSON.stringify(errors.slice(0, 5))}`,
);
assert.strictEqual(completed, TOTAL_QUERIES);

// Verify backend prepared statement evictions are happening.
// With 50k unique statements, pool_size=10, and capacity=500,
// each connection handles ~5k queries → ~4500 evictions each.
const res = await fetch("http://localhost:9090");
const metrics = await res.text();
const evictions = metrics
.split("\n")
.filter(
(l) =>
l.startsWith("pgdog_total_prepared_evictions") &&
l.includes('database="pgdog"') &&
l.includes('user="pgdog"'),
)
.map((l) => parseInt(l.split(" ").pop(), 10))
.reduce((a, b) => a + b, 0);
assert.ok(
evictions > 0,
`expected prepared statement evictions, got ${evictions}`,
);
});
});
11 changes: 6 additions & 5 deletions pgdog/src/backend/pool/pool_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ impl Pool {
self.comms().ready.notified().await;
}

let (server, granted_at) = if let Some(mut server) = server {
server
.prepared_statements_mut()
.set_capacity(self.inner.config.prepared_statements_limit);
server.set_pooler_mode(self.inner.config.pooler_mode);
let (mut server, granted_at) = if let Some(server) = server {
(Guard::new(pool, server, granted_at), granted_at)
} else {
// Slow path, pool is empty, will create new connection
Expand All @@ -165,6 +161,11 @@ impl Pool {
waiting.wait().await?
};

server
.prepared_statements_mut()
.set_capacity(self.inner.config.prepared_statements_limit);
server.set_pooler_mode(self.inner.config.pooler_mode);

match self
.maybe_healthcheck(
server,
Expand Down
55 changes: 54 additions & 1 deletion pgdog/src/frontend/prepared_statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl PreparedStatements {
// Key already existed, only value changed.
self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value));
self.memory_used += str_mem(&name);
self.global.write().decrement(&name);
self.global.write().decrement(&old_value);
} else {
// New entry.
self.memory_used += str_mem(key) + str_mem(&name);
Expand Down Expand Up @@ -273,4 +273,57 @@ mod test {
0
);
}

/// Regression test: anonymous statements with different query texts
/// must decrement the OLD global entry, not the new one.
/// Previously, the new entry was immediately set to used=0 (evictable)
/// while the old entry leaked at used=1 forever.
#[test]
fn test_anonymous_different_queries_decrement_old() {
let mut statements = PreparedStatements::default();

// First anonymous Parse: "" → __pgdog_1, used: 1
let mut parse1 = ProtocolMessage::from(Parse::new_anonymous("SELECT 1"));
statements.maybe_rewrite(&mut parse1).unwrap();

let global = statements.global.read();
let first = global.statements().values().next().unwrap();
assert_eq!(first.used, 1);
let first_name = first.name();
drop(global);

// Second anonymous Parse with DIFFERENT query: "" → __pgdog_2
// This replaces the local "" mapping.
let mut parse2 = ProtocolMessage::from(Parse::new_anonymous("SELECT 2"));
statements.maybe_rewrite(&mut parse2).unwrap();

let global = statements.global.read();
assert_eq!(global.statements().len(), 2);

for (_, stmt) in global.statements() {
if stmt.name() == first_name {
// Old entry: should be decremented to 0 (no longer referenced).
assert_eq!(stmt.used, 0, "old entry should be decremented");
} else {
// New entry: should stay at 1 (actively referenced).
assert_eq!(stmt.used, 1, "new entry should remain at used=1");
}
}
drop(global);

// Third anonymous Parse with yet another query.
let mut parse3 = ProtocolMessage::from(Parse::new_anonymous("SELECT 3"));
statements.maybe_rewrite(&mut parse3).unwrap();

let global = statements.global.read();
assert_eq!(global.statements().len(), 3);

// Exactly one entry should have used=1 (the latest).
let active = global.statements().values().filter(|s| s.used == 1).count();
assert_eq!(active, 1, "exactly one statement should be active");

// The other two should have used=0.
let unused = global.statements().values().filter(|s| s.used == 0).count();
assert_eq!(unused, 2, "old statements should be unused");
}
}
Loading