Skip to content

Commit 059e5cf

Browse files
committed
feat: add state management to job data and update job creation logic
1 parent 036ddec commit 059e5cf

File tree

5 files changed

+30
-4
lines changed

5 files changed

+30
-4
lines changed

custom/NavbarJobs.vue

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,18 @@
9797
if (data.finishedAt) {
9898
jobs.value[jobIndex].finishedAt = data.finishedAt;
9999
}
100+
if (data.state) {
101+
jobs.value[jobIndex].state = {
102+
...jobs.value[jobIndex].state,
103+
...data.state,
104+
};
105+
}
100106
} else {
101107
jobs.value.unshift({
102108
id: data.jobId,
103109
name: data.name || 'Unknown Job',
104110
status: data.status || 'IN_PROGRESS',
111+
state: data.state || {},
105112
progress: data.progress || 0,
106113
createdAt: data.createdAt || new Date().toISOString(),
107114
customComponent: data.customComponent,

custom/utils.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export interface IJob {
33
id: string;
44
name: string;
55
status: 'IN_PROGRESS' | 'DONE' | 'DONE_WITH_ERRORS' | 'CANCELLED';
6+
state: Record<string, any>;
67
progress: number; // 0 to 100
78
createdAt: Date;
89
finishedAt?: Date;

index.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
import { AdminForthPlugin, Filters, Sorts } from "adminforth";
2-
import type { IAdminForth, IHttpServer, AdminForthResourcePages, AdminForthResourceColumn, AdminForthDataTypes, AdminForthResource, AdminUser, AdminForthComponentDeclarationFull } from "adminforth";
2+
import type { IAdminForth, IHttpServer, AdminForthResource, AdminUser, AdminForthComponentDeclarationFull } from "adminforth";
33
import type { PluginOptions } from './types.js';
44
import { afLogger } from "adminforth";
55
import pLimit from 'p-limit';
66
import { Level } from 'level';
77
import fs from 'fs/promises';
8+
import {Mutex, MutexInterface, Semaphore, SemaphoreInterface, withTimeout} from 'async-mutex';
9+
10+
const mutex = new Mutex();
811

912
type TaskStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED';
1013
type setStateFieldParams = (state: Record<string, any>) => void;
1114
type getStateFieldParams = () => any;
12-
type taskHandlerType = ( { setTaskStateField, getTaskStateField }: { setTaskStateField: setStateFieldParams; getTaskStateField: getStateFieldParams } ) => Promise<void>;
15+
type taskHandlerType = ( { jobId, setTaskStateField, getTaskStateField }: { jobId: string; setTaskStateField: setStateFieldParams; getTaskStateField: getStateFieldParams } ) => Promise<void>;
1316
type taskType = {
1417
skip?: boolean;
1518
state: Record<string, any>;
@@ -146,7 +149,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
146149
adminUser: AdminUser,
147150
tasks: taskType[],
148151
jobHandlerName: string,
149-
) {
152+
): Promise<string> {
150153

151154
const handleTask: taskHandlerType = this.taskHandlers[jobHandlerName];
152155
if (!handleTask) {
@@ -161,6 +164,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
161164
[this.options.progressField]: 0,
162165
[this.options.statusField]: 'IN_PROGRESS',
163166
[this.options.jobHandlerField]: jobHandlerName,
167+
[this.options.stateField]: '{}'
164168
}
165169

166170
const creationResult = await this.adminforth.resource(this.getResourceId()).create(objectToSave);
@@ -193,6 +197,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
193197
await Promise.all(createTaskRecordsPromises);
194198

195199
this.runProcessingTasks(tasks, jobLevelDb, jobId, handleTask, parrallelLimit);
200+
return jobId;
196201
}
197202

198203
private async runProcessingTasks(
@@ -238,7 +243,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
238243

239244
//handling the task
240245
try {
241-
await handleTask({ setTaskStateField, getTaskStateField });
246+
await handleTask({ jobId, setTaskStateField, getTaskStateField });
242247

243248
//Set task status to completed in level db
244249
await this.setLevelDbTaskStatusField(jobLevelDb, taskIndex.toString(), 'DONE');
@@ -348,6 +353,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
348353
const state = jobRecord[this.options.stateField];
349354
const parsedState = JSON.parse(state);
350355
parsedState[key] = value;
356+
this.adminforth.websocket.publish(`/background-jobs`, { jobId, state: parsedState });
351357
await this.adminforth.resource(this.getResourceId()).update(jobId, {
352358
[this.options.stateField]: JSON.stringify(parsedState),
353359
});
@@ -415,6 +421,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
415421
createdAt: job[this.options.createdAtField],
416422
finishedAt: job[this.options.finishedAtField] || null,
417423
status: job[this.options.statusField],
424+
state: JSON.parse(job[this.options.stateField]),
418425
progress: job[this.options.progressField],
419426
customComponent: this.jobCustomComponents[job[this.options.jobHandlerField]],
420427
}

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"dependencies": {
2222
"@vueuse/core": "^14.2.1",
2323
"adminforth": "latest",
24+
"async-mutex": "^0.5.0",
2425
"level": "^10.0.0",
2526
"p-limit": "^7.3.0"
2627
}

0 commit comments

Comments
 (0)