Skip to content

Commit a8990f6

Browse files
authored
Merge pull request #12 from devit-tel/feature/add-sync-worker
Added sync update worker
2 parents d33d858 + 2093585 commit a8990f6

5 files changed

Lines changed: 311 additions & 4 deletions

File tree

package-lock.json

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"license": "Apache-2.0",
2020
"dependencies": {
2121
"@melonade/melonade-declaration": "^0.19.2",
22+
"axios": "^0.20.0",
2223
"node-rdkafka": "^2.9.1",
2324
"ramda": "^0.26.1",
2425
"tslib": "^2.0.1"

src/example/syncWorker.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { State, Task } from '@melonade/melonade-declaration';
2+
import { TaskStates } from '..';
3+
import { SyncWorker } from '../syncWorker';
4+
5+
const kafkaServers = process.env['MELONADE_KAFKA_SERVERS'];
6+
const namespace = process.env['MELONADE_NAMESPACE'];
7+
const processManagerUrl =
8+
process.env['MELONADE_PROCESS_MANAGER_URL'] || 'http://localhost:8081';
9+
10+
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));
11+
12+
for (const forkID in new Array(1).fill(null)) {
13+
for (const workerId of [1, 2, 3]) {
14+
const worker = new SyncWorker(
15+
// task name
16+
`t${workerId}`,
17+
// process task
18+
async (task, updateTask) => {
19+
await updateTask(task, { status: TaskStates.Inprogress });
20+
console.log(`Processing ${task.taskName}`);
21+
await sleep(5000);
22+
await updateTask(task, { status: State.TaskStates.Completed });
23+
},
24+
// compensate task
25+
async (task, updateTask) => {
26+
await updateTask(task, { status: TaskStates.Inprogress });
27+
console.log(`Compenstating ${task.taskName}`);
28+
await sleep(10);
29+
await updateTask(task, { status: TaskStates.Completed });
30+
},
31+
// configs
32+
{
33+
processManagerUrl,
34+
kafkaServers,
35+
namespace,
36+
},
37+
);
38+
39+
worker.once('ready', () => {
40+
console.log(`Fork ${forkID} Worker t${workerId} is ready!`);
41+
});
42+
43+
worker.on('task-timeout', (task: Task.ITask) => {
44+
console.log(
45+
`Worker skiped ${task.taskName}: ${task.taskId} because it already timed out`,
46+
);
47+
});
48+
}
49+
}

src/syncWorker.ts

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
import { Event, Kafka, Task } from '@melonade/melonade-declaration';
2+
import axios from 'axios';
3+
import { EventEmitter } from 'events';
4+
import {
5+
ConsumerGlobalConfig,
6+
KafkaConsumer,
7+
LibrdKafkaError,
8+
Message,
9+
} from 'node-rdkafka';
10+
import { jsonTryParse } from './utils/common';
11+
import {
12+
isTaskTimeout,
13+
ITaskRef,
14+
ITaskResponse,
15+
mapTaskNameToTopic,
16+
} from './worker';
17+
18+
export interface ISyncWorkerConfig {
19+
processManagerUrl: string;
20+
kafkaServers: string;
21+
namespace?: string;
22+
maximumPollingTasks?: number;
23+
pollingCooldown?: number;
24+
processTimeoutTask?: boolean;
25+
autoStart?: boolean;
26+
latencyCompensationMs?: number;
27+
trackingRunningTasks?: boolean;
28+
}
29+
30+
export interface ISyncUpdateTask {
31+
(task: ITaskRef, result: ITaskResponse): Promise<void>;
32+
}
33+
34+
const DEFAULT_WORKER_CONFIG = {
35+
namespace: 'node',
36+
maximumPollingTasks: 100,
37+
pollingCooldown: 1,
38+
processTimeoutTask: false,
39+
autoStart: true,
40+
latencyCompensationMs: 50,
41+
trackingRunningTasks: false,
42+
} as ISyncWorkerConfig;
43+
44+
// Maybe use kafka streamAPI
45+
export class SyncWorker extends EventEmitter {
46+
private consumer: KafkaConsumer;
47+
workerConfig: ISyncWorkerConfig;
48+
private isSubscribed: boolean = false;
49+
private taskCallback: (
50+
task: Task.ITask,
51+
updateTask: ISyncUpdateTask,
52+
isTimeout: boolean,
53+
) => void | Promise<void>;
54+
private compensateCallback: (
55+
task: Task.ITask,
56+
updateTask: ISyncUpdateTask,
57+
isTimeout: boolean,
58+
) => void | Promise<void>;
59+
private runningTasks: {
60+
[taskId: string]: Task.ITask;
61+
} = {};
62+
private tasksName: string | string[];
63+
64+
constructor(
65+
tasksName: string | string[],
66+
taskCallback: (
67+
task: Task.ITask,
68+
updateTask: ISyncUpdateTask,
69+
isTimeout: boolean,
70+
) => void | Promise<void>,
71+
compensateCallback: (
72+
task: Task.ITask,
73+
updateTask: ISyncUpdateTask,
74+
isTimeout: boolean,
75+
) => void | Promise<void>,
76+
workerConfig: ISyncWorkerConfig,
77+
kafkaConfig: ConsumerGlobalConfig = {},
78+
) {
79+
super();
80+
81+
this.tasksName = tasksName;
82+
this.taskCallback = taskCallback;
83+
this.compensateCallback = compensateCallback;
84+
this.workerConfig = {
85+
...DEFAULT_WORKER_CONFIG,
86+
...workerConfig,
87+
};
88+
89+
this.consumer = new KafkaConsumer(
90+
{
91+
'bootstrap.servers': workerConfig.kafkaServers,
92+
'group.id': `melonade-${this.workerConfig.namespace}.client`,
93+
'enable.auto.commit': false,
94+
...kafkaConfig,
95+
},
96+
{ 'auto.offset.reset': 'earliest' },
97+
);
98+
99+
this.consumer.on('ready', () => {
100+
this.emit('ready');
101+
102+
if (Array.isArray(tasksName)) {
103+
this.consumer.subscribe(
104+
tasksName.map((taskName: string) =>
105+
mapTaskNameToTopic(taskName, this.workerConfig.namespace),
106+
),
107+
);
108+
} else {
109+
this.consumer.subscribe([
110+
mapTaskNameToTopic(tasksName, this.workerConfig.namespace),
111+
]);
112+
}
113+
114+
if (this.workerConfig.autoStart) {
115+
this.subscribe();
116+
}
117+
});
118+
this.consumer.setDefaultConsumeTimeout(this.workerConfig.pollingCooldown);
119+
this.consumer.connect();
120+
121+
process.once('SIGTERM', () => {
122+
this.consumer.unsubscribe();
123+
});
124+
}
125+
126+
get health(): {
127+
consumer: 'connected' | 'disconnected';
128+
tasks: { [taskId: string]: Task.ITask };
129+
} {
130+
return {
131+
consumer: this.consumer.isConnected() ? 'connected' : 'disconnected',
132+
tasks: this.runningTasks,
133+
};
134+
}
135+
136+
consume = (
137+
messageNumber: number = this.workerConfig.maximumPollingTasks,
138+
): Promise<Task.ITask[]> => {
139+
return new Promise((resolve: Function, reject: Function) => {
140+
this.consumer.consume(
141+
messageNumber,
142+
(error: LibrdKafkaError, messages: Message[]) => {
143+
if (error) {
144+
setTimeout(() => reject(error), 1000);
145+
} else {
146+
resolve(
147+
messages.map((message: Kafka.kafkaConsumerMessage) =>
148+
jsonTryParse(message.value.toString(), undefined),
149+
),
150+
);
151+
}
152+
},
153+
);
154+
});
155+
};
156+
157+
updateTask = async (task: ITaskRef, result: ITaskResponse) => {
158+
await axios.post(
159+
'v1/transaction/update',
160+
{
161+
transactionId: task.transactionId,
162+
taskId: task.taskId,
163+
status: result.status,
164+
output: result.output,
165+
logs: result.logs,
166+
isSystem: false,
167+
doNotRetry: result.doNotRetry,
168+
} as Event.ITaskUpdate,
169+
{
170+
baseURL: this.workerConfig.processManagerUrl,
171+
},
172+
);
173+
return;
174+
};
175+
176+
commit = () => {
177+
return this.consumer.commit();
178+
};
179+
180+
private dispatchTask = async (task: Task.ITask, isTimeout: boolean) => {
181+
switch (task.type) {
182+
case Task.TaskTypes.Task:
183+
return await this.taskCallback(task, this.updateTask, isTimeout);
184+
case Task.TaskTypes.Compensate:
185+
return await this.compensateCallback(task, this.updateTask, isTimeout);
186+
default:
187+
throw new Error(`Task type: "${task.type}" is invalid`);
188+
}
189+
};
190+
191+
private processTask = async (task: Task.ITask) => {
192+
const isTimeout = isTaskTimeout(
193+
task,
194+
this.workerConfig.latencyCompensationMs,
195+
);
196+
if (isTimeout && this.workerConfig.processTimeoutTask === false) {
197+
this.emit('task-timeout', task);
198+
return;
199+
}
200+
201+
if (this.workerConfig.trackingRunningTasks) {
202+
this.runningTasks[task.taskId] = task;
203+
}
204+
205+
try {
206+
await this.dispatchTask(task, isTimeout);
207+
} catch (error) {
208+
console.warn(this.tasksName, error);
209+
} finally {
210+
if (this.workerConfig.trackingRunningTasks) {
211+
delete this.runningTasks[task.taskId];
212+
}
213+
}
214+
};
215+
216+
private poll = async () => {
217+
// https://github.com/nodejs/node/issues/6673
218+
while (this.isSubscribed) {
219+
try {
220+
const tasks = await this.consume();
221+
if (tasks.length > 0) {
222+
await Promise.all(tasks.map(this.processTask));
223+
this.commit();
224+
}
225+
} catch (err) {
226+
// In case of consume error
227+
console.log(this.tasksName, err);
228+
}
229+
}
230+
231+
console.log(`Stop subscribed ${this.tasksName}`);
232+
};
233+
234+
subscribe = () => {
235+
if (!this.isSubscribed) {
236+
this.isSubscribed = true;
237+
this.poll();
238+
}
239+
};
240+
241+
unsubscribe = () => {
242+
this.isSubscribed = false;
243+
};
244+
}

src/worker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ const DEFAULT_WORKER_CONFIG = {
5050
trackingRunningTasks: false,
5151
} as IWorkerConfig;
5252

53-
const alwaysCompleteFunction = (): ITaskResponse => ({
53+
export const alwaysCompleteFunction = (): ITaskResponse => ({
5454
status: State.TaskStates.Completed,
5555
});
5656

57-
const mapTaskNameToTopic = (taskName: string, prefix: string) =>
57+
export const mapTaskNameToTopic = (taskName: string, prefix: string) =>
5858
`melonade.${prefix}.task.${taskName}`;
5959

60-
const isTaskTimeout = (
60+
export const isTaskTimeout = (
6161
task: Task.ITask,
6262
latencyCompensationMs: number = 0,
6363
): boolean => {
@@ -68,7 +68,7 @@ const isTaskTimeout = (
6868
);
6969
};
7070

71-
const validateTaskResult = (result: ITaskResponse): ITaskResponse => {
71+
export const validateTaskResult = (result: ITaskResponse): ITaskResponse => {
7272
const status = R.prop('status', result);
7373
if (
7474
![

0 commit comments

Comments
 (0)