Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function makeIdleListener(pool, client) {
client.on('error', () => {
pool.log('additional client error after disconnection due to error', err)
})
pool._remove(client)
pool._remove(client, pool._pulseQueue.bind(pool))
// TODO - document that once the pool emits an error
// the client has already been closed & purged and is unusable
pool.emit('error', err, client)
Expand Down Expand Up @@ -124,6 +124,16 @@ class Pool extends EventEmitter {
return this._clients.length > this.options.min
}

_ensureMinimum() {
if (this.ending || this.ended) return
const needed = this.options.min - this._clients.length
for (let i = 0; i < needed; i++) {
this.newClient(new PendingItem((err, client, clientRelease) => {
if (!err) clientRelease()
}))
}
}

_pulseQueue() {
this.log('pulse queue')
if (this.ended) {
Expand Down Expand Up @@ -177,6 +187,7 @@ class Pool extends EventEmitter {
}

this._clients = this._clients.filter((c) => c !== client)
this._ensureMinimum()
const context = this
client.end(() => {
context.emit('remove', client)
Expand Down
166 changes: 166 additions & 0 deletions packages/pg-pool/test/min-connections.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const expect = require('expect.js')
const co = require('co')

const describe = require('mocha').describe
const it = require('mocha').it

const Pool = require('../')

const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

// Minimal mock client that connects immediately without a real database
class MockClient extends EventEmitter {
constructor() {
super()
this._queryable = true
this._ending = false
}

connect(cb) {
process.nextTick(() => cb(null))
}

end(cb) {
this._ending = true
process.nextTick(() => {
this.emit('end')
if (cb) cb()
})
}

query(text, values, cb) {
if (typeof values === 'function') {
cb = values
}
process.nextTick(() => cb && cb(null, { rows: [] }))
}
}

describe('min connections maintenance', () => {
describe('refills to min after maxUses retirement', () => {
it(
'creates replacement connections when clients are retired due to maxUses',
co.wrap(function* () {
const pool = new Pool({ min: 2, max: 5, maxUses: 2, Client: MockClient })

// Acquire 2 clients and exhaust them (2 uses each with maxUses=2)
const c1 = yield pool.connect()
const c2 = yield pool.connect()
c1.release()
c2.release()

yield wait(10)

// Second acquisition returns the same clients
const c1b = yield pool.connect()
const c2b = yield pool.connect()
// On release, both hit maxUses=2 and are retired
c1b.release()
c2b.release()

// _ensureMinimum should schedule 2 replacement connections
yield wait(50)

expect(pool.totalCount).to.equal(2)
expect(pool.idleCount).to.equal(2)

return pool.end()
})
)

it(
'does not exceed min when multiple retirements happen simultaneously',
co.wrap(function* () {
const pool = new Pool({ min: 2, max: 10, maxUses: 2, Client: MockClient })

// Exhaust 5 clients simultaneously
const clients = []
for (let i = 0; i < 5; i++) {
clients.push(yield pool.connect())
}
for (const c of clients) c.release()

yield wait(10)

const clients2 = []
for (let i = 0; i < 5; i++) {
clients2.push(yield pool.connect())
}
for (const c of clients2) c.release()

yield wait(50)

// Should refill to exactly min, not more
expect(pool.totalCount).to.equal(2)

return pool.end()
})
)
})

describe('refills to min after idle client background errors', () => {
it(
'creates a replacement when an idle client emits an error',
co.wrap(function* () {
const pool = new Pool({ min: 2, max: 5, Client: MockClient })

// Build up 2 idle connections
const c1 = yield pool.connect()
const c2 = yield pool.connect()
c1.release()
c2.release()

yield wait(10)
expect(pool.totalCount).to.equal(2)
expect(pool.idleCount).to.equal(2)

// Suppress pool-level error event to prevent unhandled rejection
pool.on('error', () => {})

// Background error on one idle client
c1.emit('error', new Error('connection reset'))

yield wait(50)

// Pool should have refilled back to min=2
expect(pool.totalCount).to.equal(2)

return pool.end()
})
)
})

describe('serves pending requests after idle client error (makeIdleListener fix)', () => {
it(
'serves a queued connect after the only idle client errors',
co.wrap(function* () {
const pool = new Pool({ max: 1, Client: MockClient })

// Place the single client into idle
const c1 = yield pool.connect()
c1.release()

yield wait(10)
expect(pool.idleCount).to.equal(1)

pool.on('error', () => {})

// Queue a pending connect while the client is idle
const connectPromise = pool.connect()
expect(pool.waitingCount).to.equal(1)

// Error destroys the only idle client
c1.emit('error', new Error('connection reset'))

// The pending request must be served by a new connection
const c2 = yield connectPromise
expect(c2).not.to.equal(c1)
c2.release()

return pool.end()
})
)
})
})
9 changes: 6 additions & 3 deletions packages/pg-pool/test/sizing.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ describe('pool size of 1', () => {
)

it(
'does remove clients when at or below min if maxLifetimeSeconds is reached',
'retires and replaces clients at or below min when maxLifetimeSeconds is reached',
co.wrap(function* () {
const pool = new Pool({ max: 1, min: 1, idleTimeoutMillis: 10, maxLifetimeSeconds: 1 })
const client = yield pool.connect()
client.release()
yield new Promise((resolve) => setTimeout(resolve, 1020))
expect(pool.idleCount).to.equal(0)
// Wait long enough for the original client to expire and a replacement to connect
yield new Promise((resolve) => setTimeout(resolve, 1500))
// Original client was retired; pool refills to min with a fresh connection
expect(pool.totalCount).to.equal(1)
expect(pool.idleCount).to.equal(1)
return yield pool.end()
})
)
Expand Down
Loading