Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions lib/asyncqueryexecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ export class AsyncQueryExecutor extends QueryExecutor {
const body = JSON.stringify(encodedOptions)

return await runWithRetry(
() => {
const generic = this._cluster.httpClient.genericRequestOptions()
async () => {
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'POST',
Expand Down Expand Up @@ -108,8 +108,8 @@ export class AsyncQueryExecutor extends QueryExecutor {
this._requestContext.setGenericRequestContextFields('', statusHandle, 'GET')

return await runWithRetry(
() => {
const generic = this._cluster.httpClient.genericRequestOptions()
async () => {
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'GET',
Expand All @@ -136,8 +136,8 @@ export class AsyncQueryExecutor extends QueryExecutor {
const body = `request_id=${encodeURIComponent(requestId)}`

return await runWithRetry(
() => {
const generic = this._cluster.httpClient.genericRequestOptions()
async () => {
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'DELETE',
Expand Down Expand Up @@ -168,8 +168,8 @@ export class AsyncQueryExecutor extends QueryExecutor {
this._requestContext.setGenericRequestContextFields('', resultHandle, 'GET')

return await runWithRetry(
() => {
const generic = this._cluster.httpClient.genericRequestOptions()
async () => {
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'GET',
Expand Down Expand Up @@ -197,8 +197,8 @@ export class AsyncQueryExecutor extends QueryExecutor {
)

return await runWithRetry(
() => {
const generic = this._cluster.httpClient.genericRequestOptions()
async () => {
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'DELETE',
Expand Down Expand Up @@ -228,7 +228,7 @@ export class AsyncQueryExecutor extends QueryExecutor {
requestOptions,
(res) => {
CouchbaseLogger.debug(
`Received startQuery response from ${requestOptions.hostname}:${requestOptions.port}. statusCode=${res.statusCode} clientContextId=${this._clientContextId}`
`Received startQuery response from ${requestOptions.host}:${requestOptions.port}. statusCode=${res.statusCode} clientContextId=${this._clientContextId}`
)
this._handleJsonResponse(res, resolve, reject, ['requestID', 'handle'])
}
Expand All @@ -240,7 +240,7 @@ export class AsyncQueryExecutor extends QueryExecutor {

req.on('error', (err) => {
CouchbaseLogger.error(
`Error sending startQuery request to ${requestOptions.hostname}:${requestOptions.port}, details: ${err.message}. clientContextId=${this._clientContextId}`
`Error sending startQuery request to ${requestOptions.host}:${requestOptions.port}, details: ${err.message}. clientContextId=${this._clientContextId}`
)
req.destroy()
this._signal.removeEventListener('abort', abortHandler)
Expand All @@ -249,7 +249,7 @@ export class AsyncQueryExecutor extends QueryExecutor {

req.on('connectTimeout', () => {
CouchbaseLogger.error(
`Connection timeout for startQuery request to ${requestOptions.hostname}:${requestOptions.port}. clientContextId=${this._clientContextId}`
`Connection timeout for startQuery request to ${requestOptions.host}:${requestOptions.port}. clientContextId=${this._clientContextId}`
)
req.destroy()
this._signal.removeEventListener('abort', abortHandler)
Expand All @@ -259,7 +259,7 @@ export class AsyncQueryExecutor extends QueryExecutor {
this._attachConnectTimeout(req)

CouchbaseLogger.debug(
`Sending startQuery request to ${requestOptions.hostname}:${requestOptions.port}. body=${body}. clientContextId=${this._clientContextId}`
`Sending startQuery request to ${requestOptions.host}:${requestOptions.port}. body=${body}. clientContextId=${this._clientContextId}`
)
req.write(body)
req.end()
Expand Down Expand Up @@ -292,7 +292,7 @@ export class AsyncQueryExecutor extends QueryExecutor {
requestOptions,
(res) => {
CouchbaseLogger.debug(
`Received response from ${requestOptions.hostname}:${requestOptions.port}. statusCode=${res.statusCode} path=${requestOptions.path} clientContextId=${this._clientContextId}`
`Received response from ${requestOptions.host}:${requestOptions.port}. statusCode=${res.statusCode} path=${requestOptions.path} clientContextId=${this._clientContextId}`
)
this._handleJsonResponse(res, resolve, reject)
}
Expand All @@ -304,7 +304,7 @@ export class AsyncQueryExecutor extends QueryExecutor {

req.on('error', (err) => {
CouchbaseLogger.error(
`Error sending request to ${requestOptions.hostname}:${requestOptions.port}${requestOptions.path}, details: ${err.message}. clientContextId=${this._clientContextId}`
`Error sending request to ${requestOptions.host}:${requestOptions.port}${requestOptions.path}, details: ${err.message}. clientContextId=${this._clientContextId}`
)
req.destroy()
this._signal.removeEventListener('abort', abortHandler)
Expand Down
56 changes: 25 additions & 31 deletions lib/errorhandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,43 +211,37 @@ export class ErrorHandler {
return false
}

return !connectionDenyList.has(nodeError.code)
// Per the RFC, only DNS and TCP-dial failures retry; anything after the
// socket is dialed (TLS handshake, cert verification) must fail fast.
return connectionRetryAllowList.has(nodeError.code)
}
}

/**
* Taken from https://github.com/sindresorhus/is-retry-allowed
* Node `err.code`s for the DNS and TCP-dial failures the RFC marks retriable.
*
* @internal
*/
const connectionDenyList = new Set([
const connectionRetryAllowList = new Set([
// DNS resolution failures
'EAI_AGAIN',
'ENOTFOUND',
// TCP-dial / connection failures
'ECONNREFUSED',
'ECONNRESET',
'ECONNABORTED',
'ETIMEDOUT',
'EHOSTUNREACH',
'EHOSTDOWN',
'ENETUNREACH',
'UNABLE_TO_GET_ISSUER_CERT',
'UNABLE_TO_GET_CRL',
'UNABLE_TO_DECRYPT_CERT_SIGNATURE',
'UNABLE_TO_DECRYPT_CRL_SIGNATURE',
'UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY',
'CERT_SIGNATURE_FAILURE',
'CRL_SIGNATURE_FAILURE',
'CERT_NOT_YET_VALID',
'CERT_HAS_EXPIRED',
'CRL_NOT_YET_VALID',
'CRL_HAS_EXPIRED',
'ERROR_IN_CERT_NOT_BEFORE_FIELD',
'ERROR_IN_CERT_NOT_AFTER_FIELD',
'ERROR_IN_CRL_LAST_UPDATE_FIELD',
'ERROR_IN_CRL_NEXT_UPDATE_FIELD',
'OUT_OF_MEM',
'DEPTH_ZERO_SELF_SIGNED_CERT',
'SELF_SIGNED_CERT_IN_CHAIN',
'UNABLE_TO_GET_ISSUER_CERT_LOCALLY',
'UNABLE_TO_VERIFY_LEAF_SIGNATURE',
'CERT_CHAIN_TOO_LONG',
'CERT_REVOKED',
'INVALID_CA',
'PATH_LENGTH_EXCEEDED',
'INVALID_PURPOSE',
'CERT_UNTRUSTED',
'CERT_REJECTED',
'HOSTNAME_MISMATCH',
'ENETDOWN',
'ENETRESET',
'EADDRNOTAVAIL',
// EPIPE looks like a post-dial write failure, but it only reaches here as a
// request-side error (isRequestError), meaning the peer tore down the
// connection before accepting our request (e.g. a closed keep-alive socket,
// an LB drop, or a restarting node). Nothing was processed, so retrying is
// safe. A broken pipe after the server began responding surfaces as a
// response-side error and fails fast instead.
'EPIPE',
])
84 changes: 51 additions & 33 deletions lib/httpclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Agent as HttpAgent } from 'node:http'
import { Agent as HttpsAgent } from 'node:https'
import { isIP } from 'node:net'
import { AnalyticsError, InvalidArgumentError } from './errors.js'
import { ConnectionError } from './internalerrors.js'
import type { ClusterCredential } from './credential.js'
import { SecurityOptions } from './cluster.js'
import { Certificates } from './certificates.js'
Expand Down Expand Up @@ -47,7 +48,6 @@ export class HttpClient {
this._hostname = url.hostname
this._credential = credential
this._securityOptions = securityOptions
this.randomLookup = this.randomLookup.bind(this)

if (url.protocol === 'http:') {
if (credential.type === 'certificate') {
Expand Down Expand Up @@ -77,15 +77,15 @@ export class HttpClient {
}

/**
* Returns request options with the current credential's `Authorization`
* header set.
* Returns the credential/agent/port portion of the request options, with the
* current credential's `Authorization` header set. Does NOT resolve the host;
* see {@link requestOptions} for the per-request target selection.
*
* @internal
*/
genericRequestOptions(): http.RequestOptions {
const opts: http.RequestOptions = {
agent: this._agent,
hostname: this._hostname,
port: this._port,
}
if (this._credential.type !== 'certificate') {
Expand All @@ -94,6 +94,32 @@ export class HttpClient {
return opts
}

/**
* Builds the options for a single request. Per the RFC, resolves the
* hostname and picks a random A/AAAA record per request (so the keep-alive
* agent does not pin to one node), connecting to that IP. The hostname is
* kept for the TLS SNI `servername` (via the agent) and the `Host` header,
* so cert verification and vhost routing are unaffected. An IP literal is
* used as-is.
*
* @internal
*/
async requestOptions(): Promise<http.RequestOptions> {
const opts = this.genericRequestOptions()

if (isIP(this._hostname) !== 0) {
opts.host = this._hostname
return opts
}

opts.host = await this._selectRequestAddress()
opts.headers = {
...opts.headers,
Host: `${this._hostname}:${this._port}`,
}
return opts
}

/**
* Replace the credential used for subsequent requests. Cross-type
* rotation throws `InvalidArgumentError`.
Expand Down Expand Up @@ -129,7 +155,6 @@ export class HttpClient {
if (this._module === http) {
return new HttpAgent({
keepAlive: true,
lookup: this.randomLookup,
})
}
const tlsOptions = this._buildTlsOptions()
Expand All @@ -143,7 +168,6 @@ export class HttpClient {
}
return new HttpsAgent({
keepAlive: true,
lookup: this.randomLookup,
...tlsOptions,
})
}
Expand Down Expand Up @@ -194,35 +218,29 @@ export class HttpClient {
}

/**
* Resolves the hostname's A/AAAA records via `dns.lookup` (getaddrinfo) and
* returns one at random. DNS-resolution failures are wrapped as a request-side
* {@link ConnectionError} so they rejoin the retry path; we resolve before
* `http.request`, so they would otherwise never reach its `error` event. An
* empty result is treated like `ENOTFOUND`.
*
* @internal
*/
randomLookup(
hostname: string,
options: dns.LookupOptions,
callback: (
err: NodeJS.ErrnoException | null,
address: string | dns.LookupAddress[],
family?: number
) => void
): void {
// There are two flavours of the callback signature. if 'all' is true: (err, address[]) else (err, address, family)
// On Node.js versions > 18, 'all' is true by default, which means we have to handle both cases.
// See https://github.com/nodejs/node/issues/55762
const wantAll = options.all
dns.lookup(hostname, { ...options, all: true }, (err, addresses) => {
if (err || addresses.length === 0) {
const e = err ?? new Error(`No addresses found for ${hostname}`)
return callback(e, wantAll ? [] : '', undefined)
}
const selectedAddress =
addresses[Math.floor(Math.random() * addresses.length)]

if (wantAll) {
callback(null, [selectedAddress])
} else {
callback(null, selectedAddress.address, selectedAddress.family)
}
})
private async _selectRequestAddress(): Promise<string> {
let addresses: dns.LookupAddress[]
try {
addresses = await dns.promises.lookup(this._hostname, { all: true })
} catch (err) {
throw new ConnectionError(err as Error, true)
}
if (addresses.length === 0) {
const noRecords = new Error(
`No addresses found for ${this._hostname}`
) as NodeJS.ErrnoException
noRecords.code = 'ENOTFOUND'
throw new ConnectionError(noRecords, true)
}
return addresses[Math.floor(Math.random() * addresses.length)].address
}

/**
Expand Down
14 changes: 7 additions & 7 deletions lib/queryexecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ export class QueryExecutor {
const body = JSON.stringify(encodedOptions)

return await runWithRetry(
() => {
async () => {
// Rebuild per attempt so a credential rotated mid-query takes effect
// on the next retry.
const generic = this._cluster.httpClient.genericRequestOptions()
// on the next retry, and so each attempt re-selects an A/AAAA record.
const generic = await this._cluster.httpClient.requestOptions()
const requestOptions: http.RequestOptions = {
...generic,
method: 'POST',
Expand Down Expand Up @@ -167,7 +167,7 @@ export class QueryExecutor {
requestOptions,
(res) => {
CouchbaseLogger.debug(
`Received query response from ${requestOptions.hostname}:${requestOptions.port}. statusCode=${res.statusCode} clientContextId=${this._clientContextId}`
`Received query response from ${requestOptions.host}:${requestOptions.port}. statusCode=${res.statusCode} clientContextId=${this._clientContextId}`
)
this._handleStreamingResponse(res, resolve, reject, deadline)
}
Expand All @@ -179,7 +179,7 @@ export class QueryExecutor {

req.on('error', (err) => {
CouchbaseLogger.error(
`Error occurred while sending query request to ${requestOptions.hostname}:${requestOptions.port}, details: ${err.message}. clientContextId=${this._clientContextId}`
`Error occurred while sending query request to ${requestOptions.host}:${requestOptions.port}, details: ${err.message}. clientContextId=${this._clientContextId}`
)
req.destroy()
this._signal.removeEventListener('abort', abortHandler)
Expand All @@ -188,7 +188,7 @@ export class QueryExecutor {

req.on('connectTimeout', () => {
CouchbaseLogger.error(
`Connection timeout for query request to ${requestOptions.hostname}:${requestOptions.port}. clientContextId=${this._clientContextId}`
`Connection timeout for query request to ${requestOptions.host}:${requestOptions.port}. clientContextId=${this._clientContextId}`
)
req.destroy()
this._signal.removeEventListener('abort', abortHandler)
Expand All @@ -198,7 +198,7 @@ export class QueryExecutor {
this._attachConnectTimeout(req)

CouchbaseLogger.debug(
`Sending request to ${requestOptions.hostname}:${requestOptions.port}. body=${body}. clientContextId=${this._clientContextId}`
`Sending request to ${requestOptions.host}:${requestOptions.port}. body=${body}. clientContextId=${this._clientContextId}`
)

req.write(body)
Expand Down
Loading
Loading