Skip to content

Commit 9987500

Browse files
Copilotwangzuo
andauthored
Refactor: Extract duplicated code in job runners (#19)
* Initial plan * Initial plan for refactoring duplicated code Co-authored-by: wangzuo <1039026+wangzuo@users.noreply.github.com> * Refactor: Extract duplicated code into shared utilities Co-authored-by: wangzuo <1039026+wangzuo@users.noreply.github.com> * refactor: remove unnecessary comments and improve loop syntax in utils --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: wangzuo <1039026+wangzuo@users.noreply.github.com> Co-authored-by: Zuo Wang <wzuoadjusted@gmail.com>
1 parent 64d80a7 commit 9987500

6 files changed

Lines changed: 78 additions & 77 deletions

File tree

bun.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/fluent-ai/src/job/fal.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import type { ImageJob } from "~/src/job/schema";
22
import { createHTTPJob, downloadImages } from "~/src/job/http";
3+
import { getApiKey } from "~/src/job/utils";
34

45
// TODO: switch to fal queue api
56
const BASE_URL = "https://fal.run";
67

78
export const runner = {
89
image: async (input: ImageJob["input"], options?: ImageJob["options"]) => {
9-
const apiKey = options?.apiKey || process.env.FAL_API_KEY;
10+
const apiKey = getApiKey(options, "FAL_API_KEY");
1011

1112
const request = new Request(`${BASE_URL}/${input.model}`, {
1213
method: "POST",

packages/fluent-ai/src/job/openai.ts

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
import { EventSourceParserStream } from "eventsource-parser/stream";
2-
import type { ChatJob, ChatTool, ModelsJob } from "~/src/job/schema";
1+
import type { ChatJob, ModelsJob } from "~/src/job/schema";
32
import { createHTTPJob } from "~/src/job/http";
3+
import {
4+
getApiKey,
5+
transformToolsToFunctions,
6+
transformUsageData,
7+
createStreamingGenerator,
8+
} from "~/src/job/utils";
49

510
const BASE_URL = "https://api.openai.com/v1";
611

712
export const runner = {
813
chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => {
9-
const apiKey = options?.apiKey || process.env.OPENAI_API_KEY;
14+
const apiKey = getApiKey(options, "OPENAI_API_KEY");
1015

11-
const tools = input.tools?.map((tool: ChatTool) => ({
12-
type: "function",
13-
function: {
14-
name: tool.name,
15-
description: tool.description,
16-
parameters: tool.input,
17-
},
18-
}));
16+
const tools = transformToolsToFunctions(input.tools);
1917

2018
const request = new Request(`${BASE_URL}/chat/completions`, {
2119
method: "POST",
@@ -35,25 +33,7 @@ export const runner = {
3533

3634
return createHTTPJob(request, async (response: Response) => {
3735
if (input.stream) {
38-
return (async function* () {
39-
const eventStream = response
40-
.body!.pipeThrough(new TextDecoderStream())
41-
.pipeThrough(new EventSourceParserStream());
42-
const reader = eventStream.getReader();
43-
44-
try {
45-
for (;;) {
46-
const { done, value } = await reader.read();
47-
if (done || value.data === "[DONE]") {
48-
break;
49-
}
50-
const chunk = JSON.parse(value.data);
51-
yield { raw: chunk };
52-
}
53-
} finally {
54-
reader.releaseLock();
55-
}
56-
})();
36+
return createStreamingGenerator(response);
5737
}
5838

5939
const data = await response.json();
@@ -66,13 +46,7 @@ export const runner = {
6646
tool_calls: data.choices[0].message.tool_calls,
6747
},
6848
],
69-
usage: data.usage
70-
? {
71-
promptTokens: data.usage.prompt_tokens,
72-
completionTokens: data.usage.completion_tokens,
73-
totalTokens: data.usage.total_tokens,
74-
}
75-
: undefined,
49+
usage: transformUsageData(data.usage),
7650
};
7751
});
7852
},
@@ -81,7 +55,7 @@ export const runner = {
8155
input?: ModelsJob["input"],
8256
options?: ModelsJob["options"],
8357
) => {
84-
const apiKey = options?.apiKey || process.env.OPENAI_API_KEY;
58+
const apiKey = getApiKey(options, "OPENAI_API_KEY");
8559

8660
const request = new Request(`${BASE_URL}/models`, {
8761
method: "GET",

packages/fluent-ai/src/job/openrouter.ts

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
import { EventSourceParserStream } from "eventsource-parser/stream";
2-
import type { ChatJob, ChatTool } from "~/src/job/schema";
1+
import type { ChatJob } from "~/src/job/schema";
32
import { createHTTPJob } from "~/src/job/http";
3+
import {
4+
getApiKey,
5+
transformToolsToFunctions,
6+
transformUsageData,
7+
createStreamingGenerator,
8+
} from "~/src/job/utils";
49

510
const BASE_URL = "https://openrouter.ai/api/v1";
611

712
export const runner = {
813
chat: async (input: ChatJob["input"], options?: ChatJob["options"]) => {
9-
const apiKey = options?.apiKey || process.env.OPENROUTER_API_KEY;
14+
const apiKey = getApiKey(options, "OPENROUTER_API_KEY");
1015

11-
const tools = input.tools?.map((tool: ChatTool) => ({
12-
type: "function",
13-
function: {
14-
name: tool.name,
15-
description: tool.description,
16-
parameters: tool.input,
17-
},
18-
}));
16+
const tools = transformToolsToFunctions(input.tools);
1917

2018
const request = new Request(`${BASE_URL}/chat/completions`, {
2119
method: "POST",
@@ -35,25 +33,7 @@ export const runner = {
3533

3634
return createHTTPJob(request, async (response: Response) => {
3735
if (input.stream) {
38-
return (async function* () {
39-
const eventStream = response
40-
.body!.pipeThrough(new TextDecoderStream())
41-
.pipeThrough(new EventSourceParserStream());
42-
const reader = eventStream.getReader();
43-
44-
try {
45-
for (;;) {
46-
const { done, value } = await reader.read();
47-
if (done || value.data === "[DONE]") {
48-
break;
49-
}
50-
const chunk = JSON.parse(value.data);
51-
yield { raw: chunk };
52-
}
53-
} finally {
54-
reader.releaseLock();
55-
}
56-
})();
36+
return createStreamingGenerator(response);
5737
}
5838

5939
const data = await response.json();
@@ -66,13 +46,7 @@ export const runner = {
6646
tool_calls: data.choices[0].message.tool_calls,
6747
},
6848
],
69-
usage: data.usage
70-
? {
71-
promptTokens: data.usage.prompt_tokens,
72-
completionTokens: data.usage.completion_tokens,
73-
totalTokens: data.usage.total_tokens,
74-
}
75-
: undefined,
49+
usage: transformUsageData(data.usage),
7650
};
7751
});
7852
},
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { EventSourceParserStream } from "eventsource-parser/stream";
2+
import type { ChatTool } from "~/src/job/schema";
3+
4+
export function getApiKey(
5+
options: { apiKey?: string } | undefined,
6+
envVarName: string,
7+
): string | undefined {
8+
return options?.apiKey || process.env[envVarName];
9+
}
10+
11+
export function transformToolsToFunctions(tools?: ChatTool[]) {
12+
return tools?.map((tool: ChatTool) => ({
13+
type: "function",
14+
function: {
15+
name: tool.name,
16+
description: tool.description,
17+
parameters: tool.input,
18+
},
19+
}));
20+
}
21+
22+
export function transformUsageData(usage?: any) {
23+
return usage
24+
? {
25+
promptTokens: usage.prompt_tokens,
26+
completionTokens: usage.completion_tokens,
27+
totalTokens: usage.total_tokens,
28+
}
29+
: undefined;
30+
}
31+
32+
export async function* createStreamingGenerator(response: Response) {
33+
const eventStream = response
34+
.body!.pipeThrough(new TextDecoderStream())
35+
.pipeThrough(new EventSourceParserStream());
36+
const reader = eventStream.getReader();
37+
38+
try {
39+
while (true) {
40+
const { done, value } = await reader.read();
41+
if (done || value.data === "[DONE]") {
42+
break;
43+
}
44+
const chunk = JSON.parse(value.data);
45+
yield { raw: chunk };
46+
}
47+
} finally {
48+
reader.releaseLock();
49+
}
50+
}

packages/fluent-ai/src/job/voyage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { EmbeddingJob } from "~/src/job/schema";
22
import { createHTTPJob } from "~/src/job/http";
3+
import { getApiKey } from "~/src/job/utils";
34

45
const BASE_URL = "https://api.voyageai.com/v1";
56

@@ -8,7 +9,7 @@ export const runner = {
89
input: EmbeddingJob["input"],
910
options?: EmbeddingJob["options"],
1011
) => {
11-
const apiKey = options?.apiKey || process.env.VOYAGE_API_KEY;
12+
const apiKey = getApiKey(options, "VOYAGE_API_KEY");
1213

1314
const request = new Request(`${BASE_URL}/embeddings`, {
1415
method: "POST",

0 commit comments

Comments
 (0)