Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 103 additions & 13 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,23 @@ export class RuntimeManager {
'Content-Type': 'application/json',
},
responseType: 'stream',
signal: input.abortSignal,
}
)
.catch((err) =>
this.handleStreamError(
.catch((err) => {
if (axios.isCancel(err)) {
return null;
}
return this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);
);
});

if (!response) {
return {} as RunActionResponse;
}

let genkitVersion: string;
if (response.headers['x-genkit-version']) {
genkitVersion = response.headers['x-genkit-version'];
Expand Down Expand Up @@ -317,7 +326,22 @@ export class RuntimeManager {
resolver = resolve;
rejecter = reject;
});

const onAbort = () => {
stream.destroy();
resolver({} as RunActionResponse);
};
if (input.abortSignal) {
input.abortSignal.addEventListener('abort', onAbort);
}
const cleanup = () => {
if (input.abortSignal) {
input.abortSignal.removeEventListener('abort', onAbort);
}
};

stream.on('end', () => {
cleanup();
const parsedBuffer = JSON.parse(buffer);
if (parsedBuffer.error) {
const err = new GenkitToolsError(
Expand All @@ -342,6 +366,7 @@ export class RuntimeManager {
resolver(actionResponse);
});
stream.on('error', (err: Error) => {
cleanup();
rejecter(err);
});
return promise;
Expand All @@ -353,13 +378,21 @@ export class RuntimeManager {
'Content-Type': 'application/json',
},
responseType: 'stream', // Use stream to get early headers
signal: input.abortSignal,
})
.catch((err) =>
this.handleStreamError(
.catch((err) => {
if (axios.isCancel(err)) {
return null;
}
return this.handleStreamError(
err,
`Error running action key='${input.key}'.`
)
);
);
});

if (!response) {
return {} as RunActionResponse;
}

const traceId = response.headers['x-genkit-trace-id'];
if (traceId && onTraceId) {
Expand All @@ -369,11 +402,25 @@ export class RuntimeManager {
return new Promise<RunActionResponse>((resolve, reject) => {
let buffer = '';

const onAbort = () => {
response.data.destroy();
resolve({} as RunActionResponse);
};
if (input.abortSignal) {
input.abortSignal.addEventListener('abort', onAbort);
}
const cleanup = () => {
if (input.abortSignal) {
input.abortSignal.removeEventListener('abort', onAbort);
}
};

response.data.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
});

response.data.on('end', () => {
cleanup();
try {
const responseData = JSON.parse(buffer);

Expand Down Expand Up @@ -410,6 +457,7 @@ export class RuntimeManager {
});

response.data.on('error', (err: Error) => {
cleanup();
reject(err);
});
});
Expand Down Expand Up @@ -519,7 +567,7 @@ export class RuntimeManager {
input: apis.StreamTraceRequest,
streamingCallback: StreamingCallback<any>
): Promise<void> {
const { traceId } = input;
const { traceId, abortSignal } = input;

if (!this.telemetryServerUrl) {
throw new Error(
Expand All @@ -533,19 +581,41 @@ export class RuntimeManager {
Accept: 'text/event-stream',
},
responseType: 'stream',
signal: abortSignal,
})
.catch((err) =>
this.httpErrorHandler(
.catch((err) => {
if (axios.isCancel(err)) {
return null;
}
return this.httpErrorHandler(
err,
`Error streaming trace for traceId='${traceId}'`
)
);
);
});

if (!response) {
return;
}

const stream = response.data;
let buffer = '';

// Return a promise that resolves when the stream ends
return new Promise<void>((resolve, reject) => {
const onAbort = () => {
stream.destroy();
resolve();
};
if (abortSignal) {
abortSignal.addEventListener('abort', onAbort);
}

const cleanup = () => {
if (abortSignal) {
abortSignal.removeEventListener('abort', onAbort);
}
};

stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();

Expand All @@ -566,17 +636,37 @@ export class RuntimeManager {
: message;
const parsed = JSON.parse(jsonData);
streamingCallback(parsed);

// If this is a full trace snapshot and it's already ended, we are done
if (parsed.endTime) {
cleanup();
stream.destroy();
resolve();
}

// If this is a span_end event for the root span, we are done
if (
parsed.type === 'span_end' &&
parsed.span &&
!parsed.span.parentSpanId
) {
cleanup();
stream.destroy();
resolve();
}
} catch (err) {
logger.error(`Error parsing stream data: ${err}`);
}
}
});

stream.on('end', () => {
cleanup();
resolve();
});

stream.on('error', (err: Error) => {
cleanup();
logger.error(`Stream error for traceId='${traceId}': ${err}`);
reject(err);
});
Expand Down
Loading
Loading