Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 102 additions & 23 deletions apps/nestjs-backend/src/features/v2/v2-action-trigger.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
FieldCreated,
FieldId,
FieldUpdated,
RecordsBatchUpdated,
TableActionTriggerRequested,
TableId,
type IExecutionContext,
Expand All @@ -14,7 +15,11 @@ import { describe, expect, it } from 'vitest';
import type { ShareDbService } from '../../share-db/share-db.service';
import { V2ActionTriggerService } from './v2-action-trigger.service';

type PresencePayload = Array<{ actionKey: string; payload?: Record<string, unknown> }>;
type IPresencePayload = Array<{ actionKey: string; payload?: Record<string, unknown> }>;

const defaultTimeZone = 'UTC';
const defaultDateFormat = 'YYYY-MM-DD';
const sourceFieldId = 'fldSource0000000001';

const waitForPresenceFlush = async () => {
await new Promise<void>((resolve) => {
Expand Down Expand Up @@ -55,15 +60,15 @@ const createIds = () => {
describe('V2ActionTriggerService', () => {
it('emits setField presence payload with changed new values', async () => {
let channelSubmitted: string | undefined;
let submitted: PresencePayload | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: (channel: string) => {
channelSubmitted = channel;
return {
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
Expand Down Expand Up @@ -136,15 +141,15 @@ describe('V2ActionTriggerService', () => {

it('emits addField and setRecord presence payloads for field created', async () => {
let channelSubmitted: string | undefined;
let submitted: PresencePayload | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: (channel: string) => {
channelSubmitted = channel;
return {
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
Expand Down Expand Up @@ -206,13 +211,13 @@ describe('V2ActionTriggerService', () => {
});

it('emits setField presence payload for formatting-only field updates', async () => {
let submitted: PresencePayload | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: () => ({
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
Expand Down Expand Up @@ -249,14 +254,14 @@ describe('V2ActionTriggerService', () => {
changes: {
formatting: {
oldValue: {
date: 'YYYY-MM-DD',
date: defaultDateFormat,
time: 'None',
timeZone: 'UTC',
timeZone: defaultTimeZone,
},
newValue: {
date: 'YYYY-MM-DD',
date: defaultDateFormat,
time: 'hh:mm A',
timeZone: 'UTC',
timeZone: defaultTimeZone,
},
},
},
Expand All @@ -278,9 +283,9 @@ describe('V2ActionTriggerService', () => {
updatedProperties: ['formatting'],
options: {
formatting: {
date: 'YYYY-MM-DD',
date: defaultDateFormat,
time: 'hh:mm A',
timeZone: 'UTC',
timeZone: defaultTimeZone,
},
},
},
Expand All @@ -290,13 +295,13 @@ describe('V2ActionTriggerService', () => {
});

it('does not emit setField action for unrelated field property updates', async () => {
let submitted: PresencePayload | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: () => ({
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
Expand Down Expand Up @@ -343,15 +348,15 @@ describe('V2ActionTriggerService', () => {

it('emits requested action trigger payload for schema-driven presence events', async () => {
let channelSubmitted: string | undefined;
let submitted: PresencePayload | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: (channel: string) => {
channelSubmitted = channel;
return {
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
Expand Down Expand Up @@ -388,9 +393,9 @@ describe('V2ActionTriggerService', () => {
payload: {
tableId: tableId.toString(),
field: {
id: 'fldSource0000000001',
id: sourceFieldId,
},
fieldIds: ['fldSource0000000001', 'fldComputed00000002'],
fieldIds: [sourceFieldId, 'fldComputed00000002'],
},
});

Expand All @@ -405,22 +410,96 @@ describe('V2ActionTriggerService', () => {
payload: {
tableId: tableId.toString(),
field: {
id: 'fldSource0000000001',
id: sourceFieldId,
},
fieldIds: ['fldSource0000000001', 'fldComputed00000002'],
fieldIds: [sourceFieldId, 'fldComputed00000002'],
},
},
]);
});

it('emits setRecord presence payload with fieldIds for large batch updates', async () => {
let channelSubmitted: string | undefined;
let submitted: IPresencePayload | undefined;

const shareDbService = {
connect: () => ({
getPresence: (channel: string) => {
channelSubmitted = channel;
return {
create: () => ({
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submitted = data;
cb?.();
},
}),
};
},
}),
} as unknown as ShareDbService;

const registered: Array<{ instance: unknown }> = [];
const container = {
registerInstance: (_token: unknown, instance: unknown) => {
registered.push({ instance });
return container;
},
} as unknown as DependencyContainer;

const service = new V2ActionTriggerService(shareDbService);
service.registerProjections(container);

const projection = registered.find(
(item) =>
(item.instance as { constructor?: { name?: string } }).constructor?.name ===
'V2RecordsBatchUpdatedActionTriggerProjection'
)?.instance as IEventHandler<RecordsBatchUpdated> | undefined;

expect(projection).toBeDefined();

const { baseId, tableId, fieldId } = createIds();
const event = RecordsBatchUpdated.create({
baseId,
tableId,
source: 'user',
updates: Array.from({ length: 1001 }, (_, index) => ({
recordId: `rec${index.toString().padStart(16, '0')}`,
oldVersion: 1,
newVersion: 2,
changes: [
{
fieldId: fieldId.toString(),
oldValue: `old-${index}`,
newValue: `new-${index}`,
},
],
})),
});

const result = await projection?.handle({} as IExecutionContext, event);
expect(result?.isOk()).toBe(true);
await waitForPresenceFlush();

expect(channelSubmitted).toBe(getActionTriggerChannel(tableId.toString()));
expect(submitted).toEqual([
{
actionKey: 'setRecord',
payload: {
tableId: tableId.toString(),
fieldIds: [fieldId.toString()],
},
},
]);
});

it('batches field patch and schema-refresh setField actions into one presence submit for schema-driven updates', async () => {
const submissions: PresencePayload[] = [];
const submissions: IPresencePayload[] = [];

const shareDbService = {
connect: () => ({
getPresence: () => ({
create: () => ({
submit: (data: PresencePayload, cb?: (error?: unknown) => void) => {
submit: (data: IPresencePayload, cb?: (error?: unknown) => void) => {
submissions.push(data);
cb?.();
},
Expand Down
31 changes: 29 additions & 2 deletions apps/nestjs-backend/src/features/v2/v2-action-trigger.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ProjectionHandler,
ok,
serializeFieldUpdatedValue,
isLargeRecordBatchMutation,
} from '@teable/v2-core';
import type { IExecutionContext, IEventHandler, DomainError, Result } from '@teable/v2-core';
import type { DependencyContainer } from '@teable/v2-di';
Expand All @@ -25,7 +26,7 @@ export interface IActionTriggerData {
payload?: Record<string, unknown>;
}

type PendingActionTriggerBatch = {
type IPendingActionTriggerBatch = {
shareDbService: ShareDbService;
tableId: string;
data: IActionTriggerData[];
Expand Down Expand Up @@ -77,12 +78,24 @@ const buildUpdatedFieldPatch = (event: FieldUpdated): Record<string, unknown> =>
return patch;
};

const collectChangedFieldIds = (updates: RecordsBatchUpdated['updates']): string[] => {
const fieldIds = new Set<string>();

for (const update of updates) {
for (const change of update.changes) {
fieldIds.add(change.fieldId);
}
}

return [...fieldIds];
};

/**
* Helper to emit action triggers via ShareDB presence.
* Batches actions per table to avoid later submits overwriting earlier ones
* within the same schema update turn.
*/
const pendingActionTriggerBatches = new Map<string, PendingActionTriggerBatch>();
const pendingActionTriggerBatches = new Map<string, IPendingActionTriggerBatch>();
let flushScheduled = false;

const deferFlush = (flush: () => void) => {
Expand Down Expand Up @@ -187,6 +200,20 @@ class V2RecordsBatchUpdatedActionTriggerProjection implements IEventHandler<Reco
_context: IExecutionContext,
event: RecordsBatchUpdated
): Promise<Result<void, DomainError>> {
if (isLargeRecordBatchMutation(event.updates.length)) {
const fieldIds = collectChangedFieldIds(event.updates);
emitActionTrigger(this.shareDbService, event.tableId.toString(), [
{
actionKey: 'setRecord',
payload: {
tableId: event.tableId.toString(),
fieldIds,
},
},
]);
return ok(undefined);
}

emitActionTrigger(this.shareDbService, event.tableId.toString(), [{ actionKey: 'setRecord' }]);
return ok(undefined);
}
Expand Down
Loading
Loading