From 3fb1515590a22e1b13d71393a209a2a2d7f6af7e Mon Sep 17 00:00:00 2001 From: AkshatBaranwal Date: Fri, 3 Apr 2026 02:03:18 +0530 Subject: [PATCH] fix(gax): abort stalled REST fallback fetches --- core/packages/gax/src/fallbackServiceStub.ts | 48 +++++++- core/packages/gax/test/unit/grpc-fallback.ts | 122 ++++++++++++++++++- 2 files changed, 163 insertions(+), 7 deletions(-) diff --git a/core/packages/gax/src/fallbackServiceStub.ts b/core/packages/gax/src/fallbackServiceStub.ts index 119e3b3aab4..edf2c8e7f3f 100644 --- a/core/packages/gax/src/fallbackServiceStub.ts +++ b/core/packages/gax/src/fallbackServiceStub.ts @@ -114,11 +114,19 @@ export function generateServiceStub( for (const [rpcName, rpc] of Object.entries(rpcs)) { serviceStub[rpcName] = ( request: {}, - options?: {[name: string]: string}, - _metadata?: {} | Function, + metadata?: {[name: string]: string[]}, + callOptions?: {deadline?: Date} | Function, callback?: Function, ) => { - options ??= {}; + metadata ??= {}; + if (typeof callOptions === 'function' && callback === undefined) { + callback = callOptions; + callOptions = {}; + } + const normalizedCallOptions = + typeof callOptions === 'function' || callOptions === undefined + ? {} + : callOptions; // We cannot use async-await in this function because we need to return the canceller object as soon as possible. // Using plain old promises instead. @@ -150,11 +158,12 @@ export function generateServiceStub( let cancelRequested = false; const url = fetchParameters.url; const headers = new Headers(fetchParameters.headers); - for (const key of Object.keys(options)) { - headers.set(key, options[key][0]); + for (const key of Object.keys(metadata)) { + headers.set(key, metadata[key][0]); } const streamArrayParser = new StreamArrayParser(rpc); let response204Ok = false; + let deadlineTimer: ReturnType | undefined; const fetchRequest: gaxios.GaxiosOptions = { headers: headers, body: @@ -174,9 +183,36 @@ export function generateServiceStub( delete fetchRequest['body']; } + if (normalizedCallOptions.deadline instanceof Date) { + const timeoutMs = Math.max( + 0, + normalizedCallOptions.deadline.getTime() - Date.now(), + ); + deadlineTimer = setTimeout(() => { + cancelRequested = true; + cancelController.abort(); + }, timeoutMs); + } + + const clearDeadlineTimer = () => { + if (deadlineTimer) { + clearTimeout(deadlineTimer); + deadlineTimer = undefined; + } + }; + + const originalCancel = streamArrayParser.cancel.bind(streamArrayParser); + streamArrayParser.cancel = () => { + cancelRequested = true; + clearDeadlineTimer(); + cancelController.abort(); + originalCancel(); + }; + auth .fetch(url, fetchRequest) .then((response: Response | NodeFetchResponse) => { + clearDeadlineTimer(); // There is a legacy Apiary configuration that some services // use which allows 204 empty responses on success instead of // a 200 OK. This most commonly is seen in delete RPCs, @@ -245,6 +281,7 @@ export function generateServiceStub( } }) .catch((err: unknown) => { + clearDeadlineTimer(); if (rpc.responseStream) { if (callback) { callback(err); @@ -263,6 +300,7 @@ export function generateServiceStub( return { cancel: () => { cancelRequested = true; + clearDeadlineTimer(); cancelController.abort(); }, }; diff --git a/core/packages/gax/test/unit/grpc-fallback.ts b/core/packages/gax/test/unit/grpc-fallback.ts index 3ea2a551129..0bd6599112d 100644 --- a/core/packages/gax/test/unit/grpc-fallback.ts +++ b/core/packages/gax/test/unit/grpc-fallback.ts @@ -24,6 +24,7 @@ import * as sinon from 'sinon'; import echoProtoJson = require('../fixtures/echo.json'); import {GrpcClient} from '../../src/fallback'; import {ClientStubOptions, GoogleAuth, GoogleError} from '../../src'; +import {generateServiceStub} from '../../src/fallbackServiceStub'; import {PassThroughClient} from 'google-auth-library'; import {setMockFallbackResponse} from './utils'; @@ -486,8 +487,125 @@ describe('grpc-fallback', () => { call.cancel(); - // @ts-ignore - assert.strictEqual(createdAbortControllers[0].abortCalled, true); + assert.strictEqual( + (createdAbortControllers[0] as AbortController & {abortCalled: boolean}) + .abortCalled, + true, + ); + }); + + it('should abort unary fallback fetch when deadline expires', async () => { + const stub = generateServiceStub( + {echo: echoService.methods.Echo as protobuf.Method}, + 'https', + 'foo.example.com', + 443, + { + fetch: async () => new Promise(() => {}), + } as unknown as GoogleAuth, + () => ({ + url: 'https://foo.example.com/v1beta1/echo:echo', + headers: {}, + body: '{}', + method: 'POST' as const, + }), + () => { + throw new Error('responseDecoder should not be reached'); + }, + false, + false, + ); + + stub.echo( + {content: 'content'}, + {}, + {deadline: new Date(Date.now() + 5)}, + () => {}, + ); + + await new Promise(resolve => setTimeout(resolve, 20)); + + assert.strictEqual( + (createdAbortControllers[0] as AbortController & {abortCalled: boolean}) + .abortCalled, + true, + ); + }); + + it('should abort server-streaming fallback fetch when deadline expires', async () => { + const stub = generateServiceStub( + {expand: echoService.methods.Expand as protobuf.Method}, + 'https', + 'foo.example.com', + 443, + { + fetch: async () => new Promise(() => {}), + } as unknown as GoogleAuth, + () => ({ + url: 'https://foo.example.com/v1beta1/echo:expand', + headers: {}, + body: '{}', + method: 'POST' as const, + }), + () => { + throw new Error('responseDecoder should not be reached'); + }, + false, + false, + ); + + stub.expand( + {content: 'content'}, + {}, + {deadline: new Date(Date.now() + 5)}, + () => {}, + ); + + await new Promise(resolve => setTimeout(resolve, 20)); + + assert.strictEqual( + (createdAbortControllers[0] as AbortController & {abortCalled: boolean}) + .abortCalled, + true, + ); + }); + + it('should abort the underlying fetch when canceling a server stream', () => { + const stub = generateServiceStub( + {expand: echoService.methods.Expand as protobuf.Method}, + 'https', + 'foo.example.com', + 443, + { + fetch: async () => new Promise(() => {}), + } as unknown as GoogleAuth, + () => ({ + url: 'https://foo.example.com/v1beta1/echo:expand', + headers: {}, + body: '{}', + method: 'POST' as const, + }), + () => { + throw new Error('responseDecoder should not be reached'); + }, + false, + false, + ); + + const call = stub.expand( + {content: 'content'}, + {}, + {}, + () => {}, + ); + + call.cancel(); + + assert.strictEqual( + (createdAbortControllers[0] as AbortController & {abortCalled: boolean}) + .abortCalled, + true, + ); }); it('should have close method', done => {