Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions core/packages/gax/src/fallbackServiceStub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,19 @@ export function generateServiceStub(
for (const [rpcName, rpc] of Object.entries(rpcs)) {
serviceStub[rpcName] = (
request: {},
options?: {[name: string]: string},
_metadata?: {} | Function,
metadata?: {[name: string]: string[]},
callOptions?: {deadline?: Date} | Function,
callback?: Function,
) => {
options ??= {};
metadata ??= {};
if (typeof callOptions === 'function' && callback === undefined) {
callback = callOptions;
callOptions = {};
}
const normalizedCallOptions =
typeof callOptions === 'function' || callOptions === undefined
? {}
: callOptions;

// We cannot use async-await in this function because we need to return the canceller object as soon as possible.
// Using plain old promises instead.
Expand Down Expand Up @@ -150,11 +158,12 @@ export function generateServiceStub(
let cancelRequested = false;
const url = fetchParameters.url;
const headers = new Headers(fetchParameters.headers);
for (const key of Object.keys(options)) {
headers.set(key, options[key][0]);
for (const key of Object.keys(metadata)) {
headers.set(key, metadata[key][0]);
}
const streamArrayParser = new StreamArrayParser(rpc);
let response204Ok = false;
let deadlineTimer: ReturnType<typeof setTimeout> | undefined;
const fetchRequest: gaxios.GaxiosOptions = {
headers: headers,
body:
Expand All @@ -174,9 +183,36 @@ export function generateServiceStub(
delete fetchRequest['body'];
}

if (normalizedCallOptions.deadline instanceof Date) {
const timeoutMs = Math.max(
0,
normalizedCallOptions.deadline.getTime() - Date.now(),
);
deadlineTimer = setTimeout(() => {
cancelRequested = true;
cancelController.abort();
}, timeoutMs);
}

const clearDeadlineTimer = () => {
if (deadlineTimer) {
clearTimeout(deadlineTimer);
deadlineTimer = undefined;
}
};

const originalCancel = streamArrayParser.cancel.bind(streamArrayParser);
streamArrayParser.cancel = () => {
cancelRequested = true;
clearDeadlineTimer();
cancelController.abort();
originalCancel();
};

auth
.fetch(url, fetchRequest)
.then((response: Response | NodeFetchResponse) => {
clearDeadlineTimer();
// There is a legacy Apiary configuration that some services
// use which allows 204 empty responses on success instead of
// a 200 OK. This most commonly is seen in delete RPCs,
Expand Down Expand Up @@ -245,6 +281,7 @@ export function generateServiceStub(
}
})
.catch((err: unknown) => {
clearDeadlineTimer();
if (rpc.responseStream) {
if (callback) {
callback(err);
Expand All @@ -263,6 +300,7 @@ export function generateServiceStub(
return {
cancel: () => {
cancelRequested = true;
clearDeadlineTimer();
cancelController.abort();
},
};
Expand Down
122 changes: 120 additions & 2 deletions core/packages/gax/test/unit/grpc-fallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import * as sinon from 'sinon';
import echoProtoJson = require('../fixtures/echo.json');
import {GrpcClient} from '../../src/fallback';
import {ClientStubOptions, GoogleAuth, GoogleError} from '../../src';
import {generateServiceStub} from '../../src/fallbackServiceStub';
import {PassThroughClient} from 'google-auth-library';
import {setMockFallbackResponse} from './utils';

Expand Down Expand Up @@ -486,8 +487,125 @@ describe('grpc-fallback', () => {

call.cancel();

// @ts-ignore
assert.strictEqual(createdAbortControllers[0].abortCalled, true);
assert.strictEqual(
(createdAbortControllers[0] as AbortController & {abortCalled: boolean})
.abortCalled,
true,
);
});

it('should abort unary fallback fetch when deadline expires', async () => {
const stub = generateServiceStub(
{echo: echoService.methods.Echo as protobuf.Method},
'https',
'foo.example.com',
443,
{
fetch: async () => new Promise(() => {}),
} as unknown as GoogleAuth,
() => ({
url: 'https://foo.example.com/v1beta1/echo:echo',
headers: {},
body: '{}',
method: 'POST' as const,
}),
() => {
throw new Error('responseDecoder should not be reached');
},
false,
false,
);

stub.echo(
{content: 'content'},
{},
{deadline: new Date(Date.now() + 5)},
() => {},
);

await new Promise(resolve => setTimeout(resolve, 20));

assert.strictEqual(
(createdAbortControllers[0] as AbortController & {abortCalled: boolean})
.abortCalled,
true,
);
});

it('should abort server-streaming fallback fetch when deadline expires', async () => {
const stub = generateServiceStub(
{expand: echoService.methods.Expand as protobuf.Method},
'https',
'foo.example.com',
443,
{
fetch: async () => new Promise(() => {}),
} as unknown as GoogleAuth,
() => ({
url: 'https://foo.example.com/v1beta1/echo:expand',
headers: {},
body: '{}',
method: 'POST' as const,
}),
() => {
throw new Error('responseDecoder should not be reached');
},
false,
false,
);

stub.expand(
{content: 'content'},
{},
{deadline: new Date(Date.now() + 5)},
() => {},
);

await new Promise(resolve => setTimeout(resolve, 20));

assert.strictEqual(
(createdAbortControllers[0] as AbortController & {abortCalled: boolean})
.abortCalled,
true,
);
});

it('should abort the underlying fetch when canceling a server stream', () => {
const stub = generateServiceStub(
{expand: echoService.methods.Expand as protobuf.Method},
'https',
'foo.example.com',
443,
{
fetch: async () => new Promise(() => {}),
} as unknown as GoogleAuth,
() => ({
url: 'https://foo.example.com/v1beta1/echo:expand',
headers: {},
body: '{}',
method: 'POST' as const,
}),
() => {
throw new Error('responseDecoder should not be reached');
},
false,
false,
);

const call = stub.expand(
{content: 'content'},
{},
{},
() => {},
);

call.cancel();

assert.strictEqual(
(createdAbortControllers[0] as AbortController & {abortCalled: boolean})
.abortCalled,
true,
);
});

it('should have close method', done => {
Expand Down
Loading