Skip to content

Commit 76960e9

Browse files
[sync] feat(v2): add computed task run-by-id command (#1465) (#2782)
Synced from teableio/teable-ee@48768e3 Co-authored-by: nichenqin <nichenqin@hotmail.com>
1 parent 05af419 commit 76960e9

21 files changed

Lines changed: 689 additions & 203 deletions
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { domainError, type DomainError, InternalCommand } from '@teable/v2-core';
2+
import { err, ok, type Result } from 'neverthrow';
3+
import { z } from 'zod';
4+
5+
const runComputedTaskByIdInputSchema = z.object({
6+
taskId: z.string().trim().min(1, 'Task ID is required'),
7+
workerId: z.string().trim().min(1, 'Worker ID is required'),
8+
allowProcessingTakeover: z.boolean().optional().default(true),
9+
});
10+
11+
export type IRunComputedTaskByIdCommandInput = z.input<typeof runComputedTaskByIdInputSchema>;
12+
13+
export type RunComputedTaskByIdResult = {
14+
taskId: string;
15+
workerId: string;
16+
processed: true;
17+
};
18+
19+
export class RunComputedTaskByIdCommand extends InternalCommand {
20+
private constructor(
21+
readonly taskId: string,
22+
readonly workerId: string,
23+
readonly allowProcessingTakeover: boolean
24+
) {
25+
super();
26+
}
27+
28+
static create(raw: unknown): Result<RunComputedTaskByIdCommand, DomainError> {
29+
const parsed = runComputedTaskByIdInputSchema.safeParse(raw);
30+
if (!parsed.success) {
31+
return err(
32+
domainError.validation({
33+
message: 'Invalid RunComputedTaskByIdCommand input',
34+
details: parsed.error.format(),
35+
})
36+
);
37+
}
38+
39+
return ok(
40+
new RunComputedTaskByIdCommand(
41+
parsed.data.taskId,
42+
parsed.data.workerId,
43+
parsed.data.allowProcessingTakeover
44+
)
45+
);
46+
}
47+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { domainError, type DomainError, type IExecutionContext } from '@teable/v2-core';
2+
import { inject, injectable } from '@teable/v2-di';
3+
import { err, ok, type Result } from 'neverthrow';
4+
5+
import { v2RecordRepositoryPostgresTokens } from '../di/tokens';
6+
import type { ComputedUpdateWorker } from './worker/ComputedUpdateWorker';
7+
import { CommandHandler, type ICommandHandler } from '@teable/v2-core';
8+
import {
9+
RunComputedTaskByIdCommand,
10+
type RunComputedTaskByIdResult,
11+
} from './RunComputedTaskByIdCommand';
12+
13+
@CommandHandler(RunComputedTaskByIdCommand)
14+
@injectable()
15+
export class RunComputedTaskByIdHandler
16+
implements ICommandHandler<RunComputedTaskByIdCommand, RunComputedTaskByIdResult>
17+
{
18+
constructor(
19+
@inject(v2RecordRepositoryPostgresTokens.computedUpdateWorker)
20+
private readonly worker: ComputedUpdateWorker
21+
) {}
22+
23+
async handle(
24+
context: IExecutionContext,
25+
command: RunComputedTaskByIdCommand
26+
): Promise<Result<RunComputedTaskByIdResult, DomainError>> {
27+
const result = await this.worker.runTaskById({
28+
taskId: command.taskId,
29+
workerId: command.workerId,
30+
actorId: context.actorId,
31+
tracer: context.tracer,
32+
requestId: context.requestId,
33+
allowProcessingTakeover: command.allowProcessingTakeover,
34+
});
35+
if (result.isErr()) return err(result.error);
36+
37+
if (!result.value) {
38+
return err(
39+
domainError.notFound({
40+
code: 'computed_task.not_retryable',
41+
message: `Computed task not found or not retryable: ${command.taskId}`,
42+
})
43+
);
44+
}
45+
46+
return ok({
47+
taskId: command.taskId,
48+
workerId: command.workerId,
49+
processed: true,
50+
});
51+
}
52+
}

packages/v2/adapter-table-repository-postgres/src/record/computed/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ export * from './ComputedUpdateLock';
66
export * from './ComputedUpdatePlanner';
77
export * from './ComputedUpdateRun';
88
export * from './ExternalComputedRefreshService';
9+
export * from './RunComputedTaskByIdCommand';
10+
export * from './RunComputedTaskByIdHandler';
911
export * from './UserRenamePropagationService';
1012
export * from './FieldDependencyGraph';
1113
export * from './UpdateFromSelectBuilder';

packages/v2/adapter-table-repository-postgres/src/record/computed/outbox/ComputedUpdateOutbox.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { defaultComputedUpdateOutboxConfig } from './IComputedUpdateOutbox';
3030
import type {
3131
IComputedUpdateOutbox,
3232
ClaimBatchParams,
33+
ClaimByIdParams,
3334
ComputedUpdateOutboxConfig,
3435
AnyOutboxItem,
3536
FieldBackfillOutboxItem,
@@ -438,6 +439,67 @@ export class ComputedUpdateOutbox implements IComputedUpdateOutbox {
438439
}
439440
}
440441

442+
async claimById(
443+
params: ClaimByIdParams,
444+
context?: IExecutionContext
445+
): Promise<Result<AnyOutboxItem | null, DomainError>> {
446+
const span = context?.tracer?.startSpan('teable.outbox.claimById', {
447+
'outbox.taskId': params.taskId,
448+
'outbox.workerId': params.workerId,
449+
});
450+
451+
const executeClaim = async (): Promise<Result<AnyOutboxItem | null, DomainError>> => {
452+
const now = params.now ?? new Date();
453+
const db = resolvePostgresDbOrTx(this.db, context) as unknown as Kysely<DynamicDB>;
454+
const retryableStatuses = params.allowProcessingTakeover
455+
? [DEFAULT_STATUS, 'processing']
456+
: [DEFAULT_STATUS];
457+
458+
return runInTransaction(db, context, async (trx) => {
459+
const row = await trx
460+
.selectFrom(OUTBOX_TABLE)
461+
.selectAll()
462+
.where('id', '=', params.taskId)
463+
.where('status', 'in', retryableStatuses)
464+
.forUpdate()
465+
.executeTakeFirst();
466+
467+
if (!row) return ok(null);
468+
469+
await trx
470+
.updateTable(OUTBOX_TABLE)
471+
.set({
472+
status: 'processing',
473+
locked_at: now,
474+
locked_by: params.workerId,
475+
updated_at: now,
476+
})
477+
.where('id', '=', params.taskId)
478+
.execute();
479+
480+
const seedMap = await this.loadSeedRecords(trx, [row]);
481+
const claimedRow = {
482+
...row,
483+
status: 'processing',
484+
locked_at: now,
485+
locked_by: params.workerId,
486+
updated_at: now,
487+
};
488+
489+
return ok(toAnyOutboxItem(claimedRow, seedMap.get(String(row.id)) ?? []));
490+
});
491+
};
492+
493+
try {
494+
if (span && context?.tracer) {
495+
return await context.tracer.withSpan(span, executeClaim);
496+
}
497+
return await executeClaim();
498+
} finally {
499+
span?.end();
500+
}
501+
}
502+
441503
async markDone(taskId: string, context?: IExecutionContext): Promise<Result<void, DomainError>> {
442504
const span = context?.tracer?.startSpan('teable.outbox.markDone', {
443505
'outbox.taskId': taskId,

packages/v2/adapter-table-repository-postgres/src/record/computed/outbox/IComputedUpdateOutbox.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ export type ClaimBatchParams = {
3232
now?: Date;
3333
};
3434

35+
export type ClaimByIdParams = {
36+
taskId: string;
37+
workerId: string;
38+
now?: Date;
39+
allowProcessingTakeover?: boolean;
40+
};
41+
3542
/**
3643
* Outbox item for field backfill tasks.
3744
*/
@@ -113,6 +120,11 @@ export interface IComputedUpdateOutbox {
113120
context?: IExecutionContext
114121
): Promise<Result<ReadonlyArray<AnyOutboxItem>, DomainError>>;
115122

123+
claimById(
124+
params: ClaimByIdParams,
125+
context?: IExecutionContext
126+
): Promise<Result<AnyOutboxItem | null, DomainError>>;
127+
116128
markDone(taskId: string, context?: IExecutionContext): Promise<Result<void, DomainError>>;
117129

118130
markFailed(

packages/v2/adapter-table-repository-postgres/src/record/computed/strategies/HybridWithOutboxStrategy.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ const createOutboxStub = () => {
128128
enqueueSeedTask,
129129
enqueueFieldBackfill,
130130
claimBatch: async () => ok([]),
131+
claimById: async () => ok(null),
131132
markDone: async () => ok(undefined),
132133
markFailed: async () => ok(undefined),
133134
};

packages/v2/adapter-table-repository-postgres/src/record/computed/worker/ComputedUpdateWorker.spec.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ describe('ComputedUpdateWorker', () => {
105105
enqueueSeedTask: vi.fn(),
106106
enqueueFieldBackfill: vi.fn(),
107107
claimBatch: vi.fn().mockResolvedValue(ok([])),
108+
claimById: vi.fn(),
108109
markDone: vi.fn(),
109110
markFailed: vi.fn(),
110111
};
@@ -142,6 +143,7 @@ describe('ComputedUpdateWorker', () => {
142143
enqueueSeedTask: vi.fn(),
143144
enqueueFieldBackfill: vi.fn(),
144145
claimBatch: vi.fn().mockResolvedValue(ok([task])),
146+
claimById: vi.fn(),
145147
markDone: vi.fn(),
146148
markFailed,
147149
};
@@ -191,6 +193,7 @@ describe('ComputedUpdateWorker', () => {
191193
enqueueSeedTask: vi.fn(),
192194
enqueueFieldBackfill: vi.fn(),
193195
claimBatch: vi.fn().mockResolvedValue(ok([task])),
196+
claimById: vi.fn(),
194197
markDone,
195198
markFailed: vi.fn(),
196199
};
@@ -242,6 +245,7 @@ describe('ComputedUpdateWorker', () => {
242245
enqueueSeedTask: vi.fn(),
243246
enqueueFieldBackfill: vi.fn(),
244247
claimBatch: vi.fn().mockResolvedValue(ok([task1, task2, task3])),
248+
claimById: vi.fn(),
245249
markDone,
246250
markFailed: vi.fn(),
247251
};
@@ -305,6 +309,7 @@ describe('ComputedUpdateWorker', () => {
305309
enqueueSeedTask: vi.fn(),
306310
enqueueFieldBackfill: vi.fn(),
307311
claimBatch: vi.fn().mockResolvedValue(ok([task])),
312+
claimById: vi.fn(),
308313
markDone,
309314
markFailed: vi.fn(),
310315
};
@@ -369,6 +374,7 @@ describe('ComputedUpdateWorker', () => {
369374
enqueueSeedTask: vi.fn(),
370375
enqueueFieldBackfill: vi.fn(),
371376
claimBatch: vi.fn().mockResolvedValue(ok([task])),
377+
claimById: vi.fn(),
372378
markDone: vi.fn(),
373379
markFailed: vi.fn().mockResolvedValue(ok(undefined)),
374380
};
@@ -425,6 +431,7 @@ describe('ComputedUpdateWorker', () => {
425431
enqueueSeedTask: vi.fn(),
426432
enqueueFieldBackfill: vi.fn(),
427433
claimBatch: vi.fn().mockResolvedValue(ok([task])),
434+
claimById: vi.fn(),
428435
markDone: vi.fn(),
429436
markFailed,
430437
};
@@ -470,4 +477,92 @@ describe('ComputedUpdateWorker', () => {
470477
);
471478
});
472479
});
480+
481+
describe('runTaskById', () => {
482+
it('claims and processes the specified task id', async () => {
483+
const task = createMockTask();
484+
const markDone = vi.fn().mockResolvedValue(ok(undefined));
485+
const claimById = vi.fn().mockResolvedValue(ok(task));
486+
487+
const outbox: IComputedUpdateOutbox = {
488+
enqueueOrMerge: vi.fn(),
489+
enqueueSeedTask: vi.fn(),
490+
enqueueFieldBackfill: vi.fn(),
491+
claimBatch: vi.fn().mockResolvedValue(ok([])),
492+
claimById,
493+
markDone,
494+
markFailed: vi.fn(),
495+
};
496+
497+
const updater = createUpdaterStub({
498+
execute: vi.fn().mockResolvedValue(ok({ changesByStep: [] })),
499+
collectDirtySeedGroups: vi.fn().mockResolvedValue(ok([])),
500+
});
501+
502+
const planner = {
503+
planStage: vi.fn().mockResolvedValue(ok({ steps: [], edges: [] })),
504+
} as unknown as ComputedUpdatePlanner;
505+
506+
const worker = new ComputedUpdateWorker(
507+
outbox,
508+
updater,
509+
planner,
510+
createUnitOfWork(),
511+
createLogger(),
512+
createHasher(),
513+
createTableRepository(),
514+
createBackfillService(),
515+
createEventBus()
516+
);
517+
518+
const result = await worker.runTaskById({
519+
taskId: task.id,
520+
workerId: 'manual-worker',
521+
});
522+
523+
expect(result.isOk()).toBe(true);
524+
expect(result._unsafeUnwrap()).toBe(true);
525+
expect(claimById).toHaveBeenCalledWith(
526+
expect.objectContaining({
527+
taskId: task.id,
528+
workerId: 'manual-worker',
529+
allowProcessingTakeover: true,
530+
}),
531+
expect.anything()
532+
);
533+
expect(markDone).toHaveBeenCalledWith(task.id, expect.anything());
534+
});
535+
536+
it('returns false when the task cannot be claimed by id', async () => {
537+
const outbox: IComputedUpdateOutbox = {
538+
enqueueOrMerge: vi.fn(),
539+
enqueueSeedTask: vi.fn(),
540+
enqueueFieldBackfill: vi.fn(),
541+
claimBatch: vi.fn().mockResolvedValue(ok([])),
542+
claimById: vi.fn().mockResolvedValue(ok(null)),
543+
markDone: vi.fn(),
544+
markFailed: vi.fn(),
545+
};
546+
547+
const worker = new ComputedUpdateWorker(
548+
outbox,
549+
createUpdaterStub(),
550+
{} as ComputedUpdatePlanner,
551+
createUnitOfWork(),
552+
createLogger(),
553+
createHasher(),
554+
createTableRepository(),
555+
createBackfillService(),
556+
createEventBus()
557+
);
558+
559+
const result = await worker.runTaskById({
560+
taskId: 'cuo-missing',
561+
workerId: 'manual-worker',
562+
});
563+
564+
expect(result.isOk()).toBe(true);
565+
expect(result._unsafeUnwrap()).toBe(false);
566+
});
567+
});
473568
});

0 commit comments

Comments
 (0)