diff --git a/packages/client/src/client.test.ts b/packages/client/src/client.test.ts index e583cd549c..344ea06a8d 100644 --- a/packages/client/src/client.test.ts +++ b/packages/client/src/client.test.ts @@ -1051,3 +1051,62 @@ describe('ObjectStackClient locale → Accept-Language', () => { expect(lastHeaders(fetchMock)['Accept-Language']).toBeUndefined(); }); }); + +describe('Import-job namespace', () => { + it('createImportJob POSTs the payload to /data/:object/import/jobs', async () => { + const { client, fetchMock } = createMockClient({ jobId: 'imp_x', object: 'task', status: 'pending', total: 3, createdAt: '2026-07-01T00:00:00Z' }); + const res = await client.data.createImportJob('task', { format: 'json', rows: [{ id: 'a' }] } as any); + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3000/api/v1/data/task/import/jobs'); + expect(init.method).toBe('POST'); + expect(res).toMatchObject({ jobId: 'imp_x', status: 'pending', total: 3 }); + }); + + it('getImportJobProgress GETs /data/import/jobs/:jobId', async () => { + const { client, fetchMock } = createMockClient({ jobId: 'imp_x', object: 'task', status: 'running', percentComplete: 40 }); + const res = await client.data.getImportJobProgress('imp_x'); + expect(fetchMock.mock.calls[0][0]).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x'); + expect(res.percentComplete).toBe(40); + }); + + it('getImportJobResults GETs the /results sub-route', async () => { + const { client, fetchMock } = createMockClient({ jobId: 'imp_x', status: 'succeeded', results: [{ row: 1, ok: true, action: 'created' }], resultsTruncated: false }); + const res = await client.data.getImportJobResults('imp_x'); + expect(fetchMock.mock.calls[0][0]).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/results'); + expect(res.results).toHaveLength(1); + expect(res.resultsTruncated).toBe(false); + }); + + it('listImportJobs builds the query string and unwraps the jobs array', async () => { + const { client, fetchMock } = createMockClient({ jobs: [{ jobId: 'imp_x', object: 'task', status: 'succeeded' }] }); + const jobs = await client.data.listImportJobs({ object: 'task', status: 'succeeded', limit: 10, offset: 5 }); + const url = fetchMock.mock.calls[0][0] as string; + expect(url.startsWith('http://localhost:3000/api/v1/data/import/jobs?')).toBe(true); + expect(url).toContain('object=task'); + expect(url).toContain('status=succeeded'); + expect(url).toContain('limit=10'); + expect(url).toContain('offset=5'); + expect(jobs).toHaveLength(1); + expect(jobs[0].jobId).toBe('imp_x'); + }); + + it('cancelImportJob POSTs the /cancel sub-route', async () => { + const { client, fetchMock } = createMockClient({ success: true }); + const res = await client.data.cancelImportJob('imp_x'); + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/cancel'); + expect(init.method).toBe('POST'); + expect(res.success).toBe(true); + }); + + it('undoImportJob POSTs the /undo sub-route', async () => { + const { client, fetchMock } = createMockClient({ success: true, jobId: 'imp_x', object: 'task', deleted: 3, restored: 2, failed: 0 }); + const res = await client.data.undoImportJob('imp_x'); + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/undo'); + expect(init.method).toBe('POST'); + expect(res.success).toBe(true); + expect(res.deleted).toBe(3); + expect(res.restored).toBe(2); + }); +}); diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 18830952ed..2fec75a751 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -82,6 +82,16 @@ import { UnsubscribeResponse, WellKnownCapabilities, ApiRoutes, + ImportRequest, + ImportResponse, + CreateImportJobRequest, + CreateImportJobResponse, + ImportJobProgress, + ImportJobResults, + ImportJobSummary, + ListImportJobsRequest, + ListImportJobsResponse, + UndoImportJobResponse, } from '@objectstack/spec/api'; import type { ApprovalRequestRow, @@ -3046,6 +3056,95 @@ export class ObjectStackClient { return this.unwrapResponse(res); }, + /** + * Bulk-import rows (CSV text or JSON row objects) into an object. + * + * The server coerces each cell to its storage value using the object's field + * metadata (booleans, numbers, dates→ISO, select label→code, lookup name→id), + * so callers send raw spreadsheet values plus an optional column `mapping`. + * `writeMode` selects insert / update / upsert (the latter two need + * `matchFields`); `dryRun` validates + previews without persisting. The + * response carries per-row outcomes for an import report. + */ + import: async (object: string, request: ImportRequest): Promise => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/${object}/import`, { + method: 'POST', + body: JSON.stringify(request), + }); + return this.unwrapResponse(res); + }, + + /** + * Import-job namespace — the asynchronous counterpart to {@link import} for + * large files (up to 50,000 rows). `createImportJob` posts the whole payload + * once and returns immediately with a `jobId`; a server worker processes the + * batch in the background. Poll {@link getImportJobProgress} for live + * counters, {@link getImportJobResults} for the capped per-row report, and + * {@link listImportJobs} for history. {@link cancelImportJob} stops a + * pending/running job cooperatively. + * + * These routes require a server new enough to expose them — older servers + * return 404, which surfaces here as a rejected promise. Callers that want + * graceful degradation should feature-detect (e.g. try the job, fall back + * to the synchronous {@link import} on 404). + */ + createImportJob: async (object: string, request: CreateImportJobRequest): Promise => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/${object}/import/jobs`, { + method: 'POST', + body: JSON.stringify(request), + }); + return this.unwrapResponse(res); + }, + + getImportJobProgress: async (jobId: string): Promise => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}`); + return this.unwrapResponse(res); + }, + + getImportJobResults: async (jobId: string): Promise => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/results`); + return this.unwrapResponse(res); + }, + + listImportJobs: async (query: Partial = {}): Promise => { + const route = this.getRoute('data'); + const qs = new URLSearchParams(); + if (query.object) qs.set('object', query.object); + if (query.status) qs.set('status', query.status); + if (query.limit != null) qs.set('limit', String(query.limit)); + if (query.offset != null) qs.set('offset', String(query.offset)); + const suffix = qs.toString() ? `?${qs.toString()}` : ''; + const res = await this.fetch(`${this.baseUrl}${route}/import/jobs${suffix}`); + const body = await this.unwrapResponse(res); + return body.jobs; + }, + + cancelImportJob: async (jobId: string): Promise<{ success: boolean }> => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/cancel`, { + method: 'POST', + }); + return this.unwrapResponse<{ success: boolean }>(res); + }, + + /** + * Logically roll back a finished import: delete the records it created and + * restore the fields it updated to their pre-import values. Only jobs that + * captured an undo log (small, non-dry-run, not yet reverted) are undoable — + * others return 422. See {@link ImportJobProgress.undoable}. + */ + undoImportJob: async (jobId: string): Promise => { + const route = this.getRoute('data'); + const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/undo`, { + method: 'POST', + }); + return this.unwrapResponse(res); + }, + update: async ( object: string, id: string, @@ -3456,6 +3555,64 @@ export class ScopedProjectClient { }); return this.parent._unwrap(res); }, + /** + * Bulk-import rows (CSV text or JSON row objects) into an object. The server + * coerces each cell to its storage value from field metadata (booleans, + * numbers, dates→ISO, select label→code, lookup name→id); callers send raw + * values plus an optional column `mapping`. `writeMode` selects + * insert/update/upsert (update/upsert need `matchFields`); `dryRun` + * validates + previews without persisting. + */ + import: async (object: string, request: ImportRequest): Promise => { + const res = await this.parent._fetch(this.url(`/data/${object}/import`), { + method: 'POST', + body: JSON.stringify(request), + }); + return this.parent._unwrap(res); + }, + /** + * Asynchronous import jobs (scoped) — see the top-level `data.createImportJob` + * for semantics. Large payloads are posted once; a server worker processes + * them in the background while callers poll progress / results / history. + */ + createImportJob: async (object: string, request: CreateImportJobRequest): Promise => { + const res = await this.parent._fetch(this.url(`/data/${object}/import/jobs`), { + method: 'POST', + body: JSON.stringify(request), + }); + return this.parent._unwrap(res); + }, + getImportJobProgress: async (jobId: string): Promise => { + const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}`)); + return this.parent._unwrap(res); + }, + getImportJobResults: async (jobId: string): Promise => { + const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/results`)); + return this.parent._unwrap(res); + }, + listImportJobs: async (query: Partial = {}): Promise => { + const qs = new URLSearchParams(); + if (query.object) qs.set('object', query.object); + if (query.status) qs.set('status', query.status); + if (query.limit != null) qs.set('limit', String(query.limit)); + if (query.offset != null) qs.set('offset', String(query.offset)); + const suffix = qs.toString() ? `?${qs.toString()}` : ''; + const res = await this.parent._fetch(this.url(`/data/import/jobs${suffix}`)); + const body = await this.parent._unwrap(res); + return body.jobs; + }, + cancelImportJob: async (jobId: string): Promise<{ success: boolean }> => { + const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/cancel`), { + method: 'POST', + }); + return this.parent._unwrap<{ success: boolean }>(res); + }, + undoImportJob: async (jobId: string): Promise => { + const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/undo`), { + method: 'POST', + }); + return this.parent._unwrap(res); + }, update: async (object: string, id: string, data: Partial): Promise> => { const res = await this.parent._fetch(this.url(`/data/${object}/${id}`), { method: 'PATCH', @@ -3638,6 +3795,14 @@ export type { AuthProviderInfo, EmailPasswordConfigPublic, AuthFeaturesConfig, + CreateImportJobRequest, + CreateImportJobResponse, + ImportJobProgress, + ImportJobResults, + ImportJobSummary, + ListImportJobsRequest, + ListImportJobsResponse, + UndoImportJobResponse, } from '@objectstack/spec/api'; // Approval runtime types (ADR-0019) — surfaced so SDK consumers can type the diff --git a/packages/platform-objects/src/audit/index.ts b/packages/platform-objects/src/audit/index.ts index dccdb30dc4..c474dad81d 100644 --- a/packages/platform-objects/src/audit/index.ts +++ b/packages/platform-objects/src/audit/index.ts @@ -19,3 +19,4 @@ export { SysReportSchedule } from './sys-report-schedule.object.js'; export { SysJob } from './sys-job.object.js'; export { SysJobRun } from './sys-job-run.object.js'; export { SysJobQueue } from './sys-job-queue.object.js'; +export { SysImportJob } from './sys-import-job.object.js'; diff --git a/packages/platform-objects/src/audit/sys-import-job.object.ts b/packages/platform-objects/src/audit/sys-import-job.object.ts new file mode 100644 index 0000000000..55ff63cf59 --- /dev/null +++ b/packages/platform-objects/src/audit/sys-import-job.object.ts @@ -0,0 +1,109 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * sys_import_job — Asynchronous Data Import Job + * + * Each row tracks one bulk import submitted through the async import API + * (`POST /data/:object/import/jobs`). The client sends the whole payload + * (rows[] or a base64 xlsx) in one request; the server persists this row, + * processes the batch in the background, and streams progress by updating the + * counters below. Readers poll `progress` / `results` and list history. + * + * Persisting to the DB (rather than in-memory) means progress and history + * survive a server restart and are queryable per object / per user. + * + * Writers: the rest-server import-job worker. + * Readers: the Import Wizard (progress + history), dashboards. + * + * @namespace sys + */ +export const SysImportJob = ObjectSchema.create({ + name: 'sys_import_job', + label: 'Import Job', + pluralLabel: 'Import Jobs', + icon: 'upload', + isSystem: true, + managedBy: 'system', + description: 'Asynchronous bulk-import job state, progress, and history', + displayNameField: 'object_name', + nameField: 'object_name', // [ADR-0079] canonical primary-title pointer + titleFormat: '{object_name} import @ {created_at}', + compactLayout: ['object_name', 'status', 'processed_rows', 'total_rows', 'created_at'], + + fields: { + id: Field.text({ label: 'Job ID', required: true, readonly: true, group: 'System' }), + + object_name: Field.text({ + label: 'Object', + required: true, + maxLength: 255, + searchable: true, + description: 'API name of the object being imported into', + group: 'Identity', + }), + + status: Field.select( + ['pending', 'running', 'succeeded', 'failed', 'cancelled'], + { label: 'Status', required: true, defaultValue: 'pending', group: 'State' }, + ), + + // ── progress counters (updated as the worker streams through the batch) ── + total_rows: Field.number({ label: 'Total Rows', required: true, defaultValue: 0, group: 'Progress' }), + processed_rows: Field.number({ label: 'Processed Rows', required: true, defaultValue: 0, group: 'Progress' }), + created_count: Field.number({ label: 'Created', required: false, defaultValue: 0, group: 'Progress' }), + updated_count: Field.number({ label: 'Updated', required: false, defaultValue: 0, group: 'Progress' }), + skipped_count: Field.number({ label: 'Skipped', required: false, defaultValue: 0, group: 'Progress' }), + error_count: Field.number({ label: 'Errors', required: false, defaultValue: 0, group: 'Progress' }), + + // ── request echo (so history is self-describing without the payload) ── + write_mode: Field.select( + ['insert', 'update', 'upsert'], + { label: 'Write Mode', required: false, defaultValue: 'insert', group: 'Request' }, + ), + dry_run: Field.boolean({ label: 'Dry Run', required: false, defaultValue: false, group: 'Request' }), + run_automations: Field.boolean({ label: 'Run Automations', required: false, defaultValue: false, group: 'Request' }), + + // ── outcome ── + error: Field.textarea({ label: 'Fatal Error', required: false, group: 'Outcome' }), + results: Field.json({ + label: 'Row Results (sample)', + required: false, + description: 'Capped sample of per-row results (failures first) for the UI', + group: 'Outcome', + }), + + // ── undo / logical rollback ── + undo_log: Field.json({ + label: 'Undo Log', + required: false, + description: 'Reversal instructions ({created:[ids], updated:[{id,before}]}) captured for small non-dry-run jobs so the import can be undone', + group: 'Outcome', + }), + reverted_at: Field.datetime({ + label: 'Reverted At', + required: false, + description: 'Set when the import was undone (created records deleted, updated records restored)', + group: 'Outcome', + }), + + // ── lifecycle timestamps ── + started_at: Field.datetime({ label: 'Started At', required: false, group: 'State' }), + completed_at: Field.datetime({ label: 'Completed At', required: false, group: 'State' }), + created_by: Field.text({ label: 'Created By', required: false, readonly: true, group: 'System' }), + created_at: Field.datetime({ + label: 'Created At', + required: true, + defaultValue: 'NOW()', + readonly: true, + group: 'System', + }), + }, + + indexes: [ + { fields: ['object_name', 'created_at'] }, + { fields: ['status', 'created_at'] }, + { fields: ['created_by', 'created_at'] }, + ], +}); diff --git a/packages/rest/package.json b/packages/rest/package.json index dd9cb590de..ddef39fdcf 100644 --- a/packages/rest/package.json +++ b/packages/rest/package.json @@ -20,6 +20,7 @@ }, "dependencies": { "@objectstack/core": "workspace:*", + "@objectstack/platform-objects": "workspace:*", "@objectstack/service-package": "workspace:*", "@objectstack/spec": "workspace:*", "exceljs": "^4.4.0", diff --git a/packages/rest/src/import-coerce.test.ts b/packages/rest/src/import-coerce.test.ts new file mode 100644 index 0000000000..100bb99412 --- /dev/null +++ b/packages/rest/src/import-coerce.test.ts @@ -0,0 +1,187 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * Unit tests for the import value-coercion module (`import-coerce.ts`) — the + * inverse of `export-format.ts`. These are pure (no engine): scalar parsers plus + * `coerceRow` driven by a fake reference resolver. + */ + +import { describe, it, expect } from 'vitest'; +import { + parseBooleanCell, + parseNumberCell, + parseDateCell, + matchOption, + splitMulti, + coerceRow, +} from './import-coerce'; +import type { ExportFieldMeta } from './export-format'; + +describe('parseBooleanCell', () => { + it('accepts common truthy spellings across languages', () => { + for (const t of ['true', 'TRUE', 'yes', 'Y', '1', 'on', '是', '对', '✓', true, 1]) { + expect(parseBooleanCell(t)).toBe(true); + } + }); + it('accepts common falsy spellings', () => { + for (const f of ['false', 'No', 'n', '0', 'off', '否', '错', false, 0]) { + expect(parseBooleanCell(f)).toBe(false); + } + }); + it('returns undefined for gibberish', () => { + expect(parseBooleanCell('maybe')).toBeUndefined(); + expect(parseBooleanCell(2)).toBeUndefined(); + }); +}); + +describe('parseNumberCell', () => { + it('strips thousands separators, currency symbols, and percent signs', () => { + expect(parseNumberCell('1,234.5')).toBe(1234.5); + expect(parseNumberCell('$1,000')).toBe(1000); + expect(parseNumberCell('¥2,500.75')).toBe(2500.75); + expect(parseNumberCell('25%')).toBe(25); + }); + it('handles accounting-style parenthesised negatives', () => { + expect(parseNumberCell('(1,234)')).toBe(-1234); + }); + it('rejects non-numeric residue', () => { + expect(parseNumberCell('abc')).toBeUndefined(); + expect(parseNumberCell('12x3')).toBeUndefined(); + expect(parseNumberCell('')).toBeUndefined(); + }); +}); + +describe('parseDateCell', () => { + it('normalises bare calendar dates without timezone drift', () => { + expect(parseDateCell('2026-06-30', 'date')).toBe('2026-06-30'); + expect(parseDateCell('2026/6/3', 'date')).toBe('2026-06-03'); + }); + it('emits full ISO for datetime', () => { + expect(parseDateCell('2026-06-30', 'datetime')).toBe('2026-06-30T00:00:00.000Z'); + }); + it('accepts and normalises time-of-day', () => { + expect(parseDateCell('14:30', 'time')).toBe('14:30:00'); + expect(parseDateCell('09:05:07', 'time')).toBe('09:05:07'); + }); + it('rejects nonsense', () => { + expect(parseDateCell('not-a-date', 'date')).toBeUndefined(); + expect(parseDateCell('2026-13-40', 'date')).toBeUndefined(); + }); +}); + +describe('matchOption', () => { + const options = [{ label: '高', value: 'high' }, { label: '低', value: 'low' }]; + it('matches by option value (code)', () => { + expect(matchOption('high', options)).toBe('high'); + }); + it('matches by human label, case-insensitively', () => { + expect(matchOption('高', options)).toBe('high'); + expect(matchOption('LOW', [{ label: 'Low', value: 'low' }])).toBe('low'); + }); + it('returns undefined when nothing matches', () => { + expect(matchOption('medium', options)).toBeUndefined(); + }); +}); + +describe('splitMulti', () => { + it('splits on commas, semicolons, Chinese comma, and newlines', () => { + expect(splitMulti('a, b;c、d\ne')).toEqual(['a', 'b', 'c', 'd', 'e']); + }); + it('passes arrays through, trimming blanks', () => { + expect(splitMulti([' x ', '', 'y'])).toEqual(['x', 'y']); + }); +}); + +describe('coerceRow', () => { + const meta = (defs: Record>): Map => { + const m = new Map(); + for (const [name, d] of Object.entries(defs)) m.set(name, { name, ...d }); + return m; + }; + + it('coerces every special value type to its storage shape', async () => { + const metaMap = meta({ + done: { type: 'boolean' }, + amount: { type: 'currency' }, + priority: { type: 'select', options: [{ label: '高', value: 'high' }] }, + tags: { type: 'multiselect', options: [{ label: 'A', value: 'a' }, { label: 'B', value: 'b' }] }, + due: { type: 'date' }, + note: { type: 'text' }, + }); + const { data, errors } = await coerceRow( + { done: '是', amount: '$1,200.50', priority: '高', tags: 'A, B', due: '2026/07/01', note: ' hi ' }, + metaMap, + {}, + ); + expect(errors).toEqual([]); + expect(data).toEqual({ + done: true, + amount: 1200.5, + priority: 'high', + tags: ['a', 'b'], + due: '2026-07-01', + note: 'hi', + }); + }); + + it('resolves reference fields via the async resolver (name → id)', async () => { + const metaMap = meta({ owner: { type: 'lookup', reference: 'user', displayField: 'name' } }); + const seen: string[] = []; + const resolveRef = async (obj: string, display: string) => { + seen.push(`${obj}:${display}`); + return display === '张三' ? 'u1' : undefined; + }; + const ok = await coerceRow({ owner: '张三' }, metaMap, { resolveRef }); + expect(ok.data).toEqual({ owner: 'u1' }); + expect(seen).toEqual(['user:张三']); + + const bad = await coerceRow({ owner: '王五' }, metaMap, { resolveRef }); + expect(bad.data.owner).toBeUndefined(); + expect(bad.errors[0]).toMatchObject({ field: 'owner', code: 'reference_not_found' }); + }); + + it('accepts a structured resolver result and flags ambiguous matches', async () => { + const metaMap = meta({ owner: { type: 'lookup', reference: 'user', displayField: 'name' } }); + const resolveRef = async (_obj: string, display: string) => { + if (display === '张三') return { id: 'u1', matchedField: 'name' }; + if (display === '李四') return { ambiguous: true, matchedField: 'name' }; + return {}; + }; + const ok = await coerceRow({ owner: '张三' }, metaMap, { resolveRef }); + expect(ok.data).toEqual({ owner: 'u1' }); + + const dup = await coerceRow({ owner: '李四' }, metaMap, { resolveRef }); + expect(dup.data.owner).toBeUndefined(); + expect(dup.errors[0]).toMatchObject({ field: 'owner', code: 'reference_ambiguous' }); + + const none = await coerceRow({ owner: '无名' }, metaMap, { resolveRef }); + expect(none.errors[0]).toMatchObject({ field: 'owner', code: 'reference_not_found' }); + }); + + it('reports coercion errors per field instead of throwing', async () => { + const metaMap = meta({ n: { type: 'number' }, b: { type: 'boolean' } }); + const { data, errors } = await coerceRow({ n: 'abc', b: 'maybe' }, metaMap, {}); + expect(data).toEqual({}); + expect(errors.map((e) => e.code).sort()).toEqual(['invalid_boolean', 'invalid_number']); + }); + + it('drops blank cells so schema defaults / existing values win', async () => { + const metaMap = meta({ a: { type: 'text' }, b: { type: 'number' } }); + const { data } = await coerceRow({ a: '', b: ' ' }, metaMap, {}); + expect(data).toEqual({}); + }); + + it('honours createMissingOptions by keeping the raw select value', async () => { + const metaMap = meta({ s: { type: 'select', options: [{ label: 'A', value: 'a' }] } }); + const strict = await coerceRow({ s: 'zzz' }, metaMap, {}); + expect(strict.errors[0]?.code).toBe('invalid_option'); + const lax = await coerceRow({ s: 'zzz' }, metaMap, { createMissingOptions: true }); + expect(lax.errors).toEqual([]); + expect(lax.data).toEqual({ s: 'zzz' }); + }); + + it('passes unknown columns through untouched', async () => { + const { data } = await coerceRow({ mystery: 'raw' }, new Map(), {}); + expect(data).toEqual({ mystery: 'raw' }); + }); +}); diff --git a/packages/rest/src/import-coerce.ts b/packages/rest/src/import-coerce.ts new file mode 100644 index 0000000000..8a907efc41 --- /dev/null +++ b/packages/rest/src/import-coerce.ts @@ -0,0 +1,350 @@ +/** + * Type-aware value *coercion* for the bulk-import route + * (`POST /data/:object/import`). + * + * This is the inverse of `export-format.ts`. A spreadsheet / CSV cell arrives as + * a raw string (or, for JSON payloads, an arbitrary primitive); the storage + * layer, on the other hand, expects *storage* values — booleans as real + * booleans, numbers as numbers, dates as ISO strings, select fields as their + * option **code** (not the human label), and lookup / user fields as the + * referenced record **id** (not its name). The engine deliberately does not + * coerce for storage (see `record-validator.ts`, which coerces only to *check* + * a value and then discards the coerced form), so import has to do it here. + * + * The accepted storage shapes below are dictated by what + * `validateFieldValue` in `packages/objectql` will accept: + * - number / currency / percent / rating / slider → a finite `number` + * - boolean / toggle → a real `boolean` + * - date / datetime → an ISO-8601 string + * - time → `HH:MM` / `HH:MM:SS` + * - select / radio → an option *value* + * - multiselect / checkboxes / tags → an array of option values + * - lookup / master_detail / user / reference → a record id (resolved async) + * + * Contract: when a field carries no usable metadata the value passes through + * untouched, so an import stays byte-identical to the pre-coercion behaviour. + */ + +import type { ExportFieldMeta } from './export-format.js'; + +/** Field types whose stored value points at another record (id). */ +const REFERENCE_TYPES = new Set(['lookup', 'master_detail', 'user', 'reference', 'tree']); +/** Single-select option types (store one option value). */ +const OPTION_TYPES = new Set(['select', 'radio']); +/** Multi-select option types (store an array of option values). */ +const MULTI_OPTION_TYPES = new Set(['multiselect', 'checkboxes', 'tags']); +/** Numeric field types (store a finite number). */ +const NUMBER_TYPES = new Set(['number', 'currency', 'percent', 'rating', 'slider']); +/** Boolean field types (store a real boolean). */ +const BOOL_TYPES = new Set(['boolean', 'toggle']); + +/** + * Structured outcome of a reference lookup. `id` set → a single record matched. + * `ambiguous` → the display value matched more than one record, so linking any + * one of them would be a guess the importer refuses to make. `matchedField` + * names the field the match came from (for diagnostics). An empty object means + * nothing matched. A bare `string | undefined` is still accepted from legacy + * resolvers and normalised to this shape. + */ +export interface RefMatch { + id?: string; + ambiguous?: boolean; + matchedField?: string; +} + +/** + * Resolve a reference field's display value (a name / email / id typed by the + * user) to the referenced record's id. Return `undefined` / `{}` when nothing + * matches (caller surfaces "not found"), a bare id string / `{ id }` on a unique + * hit, or `{ ambiguous: true }` when several records share the value. Legacy + * resolvers that return `string | undefined` keep working. Implementations are + * expected to cache — the same name shows up on many rows. + */ +export type RefResolver = ( + referenceObject: string, + displayValue: string, + meta: ExportFieldMeta, +) => Promise; + +/** Normalise a resolver result (legacy string or structured) to a RefMatch. */ +function normalizeRefMatch(result: string | undefined | RefMatch): RefMatch { + if (result == null) return {}; + if (typeof result === 'string') return result ? { id: result } : {}; + return result; +} + +export interface CoerceContext { + /** Trim leading/trailing whitespace from string-ish cells (default true). */ + trimWhitespace?: boolean; + /** Extra strings (besides `''`) treated as null, e.g. `['N/A', 'null']`. */ + nullValues?: string[]; + /** + * When a select/multiselect cell matches no known option, keep the raw value + * instead of failing. Note: the engine still validates option membership, so + * this only helps when the option is (or will be) present in the schema. + */ + createMissingOptions?: boolean; + /** Async reference resolver (name/email/id → record id). Optional. */ + resolveRef?: RefResolver; +} + +/** A per-field coercion failure, shaped like the engine's validation errors. */ +export interface FieldCoerceError { + field: string; + code: string; + message: string; +} + +// ── blank / null handling ────────────────────────────────────────── + +function isBlank(value: unknown, nullValues?: string[]): boolean { + if (value === null || value === undefined) return true; + if (typeof value === 'string') { + const s = value.trim(); + if (s === '') return true; + if (nullValues && nullValues.some((nv) => nv === value || nv === s)) return true; + } + return false; +} + +// ── boolean ──────────────────────────────────────────────────────── + +const BOOL_TRUE = new Set(['true', 't', 'yes', 'y', '1', 'on', '是', '对', '✓', '√']); +const BOOL_FALSE = new Set(['false', 'f', 'no', 'n', '0', 'off', '否', '错', '✗', '×']); + +/** Parse a spreadsheet cell into a boolean, or `undefined` if unrecognised. */ +export function parseBooleanCell(raw: unknown): boolean | undefined { + if (typeof raw === 'boolean') return raw; + if (typeof raw === 'number') { + if (raw === 1) return true; + if (raw === 0) return false; + return undefined; + } + const s = String(raw).trim().toLowerCase(); + if (BOOL_TRUE.has(s)) return true; + if (BOOL_FALSE.has(s)) return false; + return undefined; +} + +// ── numbers ──────────────────────────────────────────────────────── + +/** + * Parse a numeric cell, tolerating the punctuation spreadsheets add: thousands + * separators (`1,234`), a leading currency symbol (`$` `¥` `€` `£` `¥`), a + * trailing percent sign (`25%` → `25`), and accounting-style parenthesised + * negatives (`(1,234)` → `-1234`). Returns `undefined` when the residue is not + * a finite number. + */ +export function parseNumberCell(raw: unknown): number | undefined { + if (typeof raw === 'number') return Number.isFinite(raw) ? raw : undefined; + let s = String(raw).trim(); + if (s === '') return undefined; + let negative = false; + if (/^\(.*\)$/.test(s)) { negative = true; s = s.slice(1, -1).trim(); } + s = s.replace(/^[$¥€£¥]\s*/, ''); // leading currency symbol + s = s.replace(/%$/, '').trim(); // trailing percent + s = s.replace(/,/g, ''); // thousands separators + if (s === '' || !/^[+-]?\d*\.?\d+(e[+-]?\d+)?$/i.test(s)) return undefined; + const n = Number(s); + if (!Number.isFinite(n)) return undefined; + return negative ? -n : n; +} + +// ── dates ────────────────────────────────────────────────────────── + +function pad2(n: number): string { + return n < 10 ? `0${n}` : String(n); +} + +const TIME_OF_DAY = /^([01]\d|2[0-3]):[0-5]\d(:[0-5]\d)?$/; + +/** + * Coerce a cell into the string shape the engine accepts for a date-ish field: + * - `date` → `YYYY-MM-DD` + * - `datetime` → full ISO-8601 (`toISOString`) + * - `time` → `HH:MM` / `HH:MM:SS` + * Returns `undefined` when the cell is not a recognisable date/time. + * + * Unambiguous `YYYY-MM-DD` / `YYYY/MM/DD` inputs are normalised directly to + * avoid timezone drift; everything else falls back to `Date.parse` (which + * covers ISO datetimes and locale-default `MM/DD/YYYY`). + */ +export function parseDateCell(raw: unknown, kind: 'date' | 'datetime' | 'time'): string | undefined { + if (raw instanceof Date) { + if (Number.isNaN(raw.getTime())) return undefined; + if (kind === 'datetime') return raw.toISOString(); + if (kind === 'date') return `${raw.getUTCFullYear()}-${pad2(raw.getUTCMonth() + 1)}-${pad2(raw.getUTCDate())}`; + return `${pad2(raw.getUTCHours())}:${pad2(raw.getUTCMinutes())}:${pad2(raw.getUTCSeconds())}`; + } + const s = String(raw).trim(); + if (s === '') return undefined; + + if (kind === 'time') { + if (TIME_OF_DAY.test(s)) return s.length === 5 ? `${s}:00` : s; + // A full datetime for a time field: take its clock component. + const t = new Date(s); + if (!Number.isNaN(t.getTime())) return `${pad2(t.getUTCHours())}:${pad2(t.getUTCMinutes())}:${pad2(t.getUTCSeconds())}`; + return undefined; + } + + // Fast path: bare calendar date, no timezone games. + const ymd = s.match(/^(\d{4})[-/](\d{1,2})[-/](\d{1,2})$/); + if (ymd) { + const y = Number(ymd[1]); + const mo = Number(ymd[2]); + const d = Number(ymd[3]); + if (mo < 1 || mo > 12 || d < 1 || d > 31) return undefined; + if (kind === 'date') return `${y}-${pad2(mo)}-${pad2(d)}`; + return new Date(Date.UTC(y, mo - 1, d)).toISOString(); + } + + const parsed = new Date(s); + if (Number.isNaN(parsed.getTime())) return undefined; + if (kind === 'date') { + return `${parsed.getUTCFullYear()}-${pad2(parsed.getUTCMonth() + 1)}-${pad2(parsed.getUTCDate())}`; + } + return parsed.toISOString(); +} + +// ── options (select / multiselect) ───────────────────────────────── + +/** + * Match a cell against a field's options, accepting **either** the option value + * (code) or its human label (case-insensitive). Returns the canonical option + * value to store, or `undefined` on no match. + */ +export function matchOption( + raw: unknown, + options?: Array<{ label?: string; value?: unknown }>, +): unknown | undefined { + const s = String(raw).trim(); + if (!options || options.length === 0) return s; // no option list → accept as-is + // Exact value match first (preserves the option's original value type). + for (const o of options) { + if (o && o.value !== undefined && String(o.value) === s) return o.value; + } + // Case-insensitive label match. + const lower = s.toLowerCase(); + for (const o of options) { + if (o && typeof o.label === 'string' && o.label.trim().toLowerCase() === lower) return o.value; + } + return undefined; +} + +/** Split a multi-value cell on commas / semicolons / Chinese comma / newlines. */ +export function splitMulti(raw: unknown): string[] { + if (Array.isArray(raw)) return raw.map((v) => String(v).trim()).filter((v) => v !== ''); + return String(raw) + .split(/[,;、\n]/) + .map((v) => v.trim()) + .filter((v) => v !== ''); +} + +// ── per-field orchestration ──────────────────────────────────────── + +/** + * Coerce one raw cell to its storage value using the field metadata. On success + * returns `{ value }` (value may be `undefined`, meaning "drop this key"); on a + * hard coercion failure returns `{ error }`. + */ +export async function coerceFieldValue( + raw: unknown, + meta: ExportFieldMeta | undefined, + ctx: CoerceContext, +): Promise<{ value?: unknown } | { error: FieldCoerceError }> { + const trim = ctx.trimWhitespace !== false; + const field = meta?.name ?? ''; + + // Blank → leave the field unset so schema defaults / existing values win. + if (isBlank(raw, ctx.nullValues)) return { value: undefined }; + + const t = meta?.type; + if (!t) return { value: trim && typeof raw === 'string' ? raw.trim() : raw }; + + if (BOOL_TYPES.has(t)) { + const b = parseBooleanCell(raw); + if (b === undefined) return { error: { field, code: 'invalid_boolean', message: `${field}: "${String(raw)}" is not a boolean` } }; + return { value: b }; + } + + if (NUMBER_TYPES.has(t)) { + const n = parseNumberCell(raw); + if (n === undefined) return { error: { field, code: 'invalid_number', message: `${field}: "${String(raw)}" is not a number` } }; + return { value: n }; + } + + if (t === 'date' || t === 'datetime' || t === 'time') { + const d = parseDateCell(raw, t); + if (d === undefined) return { error: { field, code: 'invalid_date', message: `${field}: "${String(raw)}" is not a valid ${t}` } }; + return { value: d }; + } + + if (OPTION_TYPES.has(t)) { + const v = matchOption(raw, meta?.options); + if (v === undefined) { + if (ctx.createMissingOptions) return { value: String(raw).trim() }; + return { error: { field, code: 'invalid_option', message: `${field}: "${String(raw)}" is not a known option` } }; + } + return { value: v }; + } + + if (MULTI_OPTION_TYPES.has(t)) { + const parts = splitMulti(raw); + const out: unknown[] = []; + for (const part of parts) { + const v = matchOption(part, meta?.options); + if (v === undefined) { + if (ctx.createMissingOptions) { out.push(part); continue; } + return { error: { field, code: 'invalid_option', message: `${field}: "${part}" is not a known option` } }; + } + out.push(v); + } + return { value: out }; + } + + if (REFERENCE_TYPES.has(t)) { + const display = String(raw).trim(); + // If it already looks resolved (an id was pasted) or we have no resolver / + // no target object, store the raw value and let referential integrity be + // enforced downstream. + if (!ctx.resolveRef || !meta?.reference) return { value: display }; + const match = normalizeRefMatch(await ctx.resolveRef(meta.reference, display, meta)); + if (match.ambiguous) { + return { error: { field, code: 'reference_ambiguous', message: `${field}: "${display}" matches more than one ${meta.reference} — use a unique value or the record id` } }; + } + if (match.id === undefined) { + return { error: { field, code: 'reference_not_found', message: `${field}: no ${meta.reference} matches "${display}"` } }; + } + return { value: match.id }; + } + + // Everything else (text, email, phone, json, html, file, …): pass through, + // trimming string cells so stray spreadsheet padding doesn't leak into storage. + return { value: trim && typeof raw === 'string' ? raw.trim() : raw }; +} + +/** + * Coerce a whole raw row into a storage-ready record. Unknown columns (no + * matching field metadata) pass through untouched so ad-hoc / schemaless + * objects still import. Collects every field error rather than stopping at the + * first, so a UI can show all problems in a row at once. + */ +export async function coerceRow( + rawRow: Record, + metaMap: Map, + ctx: CoerceContext, +): Promise<{ data: Record; errors: FieldCoerceError[] }> { + const data: Record = {}; + const errors: FieldCoerceError[] = []; + for (const [key, raw] of Object.entries(rawRow)) { + const meta = metaMap.get(key); + const res = await coerceFieldValue(raw, meta ? meta : undefined, ctx); + if ('error' in res) { + // Attribute the error to the column even when metadata was missing. + errors.push({ ...res.error, field: res.error.field || key }); + continue; + } + if (res.value !== undefined) data[key] = res.value; + } + return { data, errors }; +} diff --git a/packages/rest/src/import-integration.test.ts b/packages/rest/src/import-integration.test.ts new file mode 100644 index 0000000000..ae82f94b74 --- /dev/null +++ b/packages/rest/src/import-integration.test.ts @@ -0,0 +1,311 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * End-to-end import integration: the REAL `POST /data/:object/import` route + * driven by a REAL {@link ObjectQL} engine + {@link ObjectStackProtocolImplementation}, + * an in-memory driver, and real registered objects — no protocol mocks. + * + * Mirrors `export-integration.test.ts`. It proves the server-side coercion + + * upsert pipeline against the SAME metadata accessor (`getMetaItem`) and write + * path (`createData`/`updateData`) a deployed server runs: human cells + * (是→true, 高→high, name→id) become storage values, and writeMode routes each + * row to create / update / skip. + */ + +import { describe, it, expect, beforeEach } from 'vitest'; +import { ObjectQL, ObjectStackProtocolImplementation } from '@objectstack/objectql'; +import { RestServer } from './rest-server'; + +// --------------------------------------------------------------------------- +// In-memory driver — equality + `$in` (what matchFields / $expand issue). +// --------------------------------------------------------------------------- +function makeMemoryDriver() { + const stores = new Map>>(); + const storeFor = (o: string) => { + let s = stores.get(o); + if (!s) { s = new Map(); stores.set(o, s); } + return s; + }; + let nextId = 0; + const matchOne = (cell: unknown, cond: unknown): boolean => { + if (cond && typeof cond === 'object' && !Array.isArray(cond)) { + const c = cond as Record; + if ('$in' in c) return Array.isArray(c.$in) && c.$in.some((x) => (cell ?? null) === (x ?? null)); + if ('$eq' in c) return (cell ?? null) === ((c.$eq as unknown) ?? null); + if ('$ne' in c) return (cell ?? null) !== ((c.$ne as unknown) ?? null); + } + return (cell ?? null) === ((cond as unknown) ?? null); + }; + const matches = (row: Record, where: any): boolean => { + if (!where || typeof where !== 'object') return true; + for (const [k, v] of Object.entries(where)) { + if (k.startsWith('$')) continue; + if (!matchOne(row[k], v)) return false; + } + return true; + }; + const driver: any = { + name: 'memory', version: '0.0.0', supports: {}, + async connect() {}, async disconnect() {}, async checkHealth() { return true; }, async execute() { return null; }, + async find(o: string, ast: any) { + const rows = Array.from(storeFor(o).values()).filter((r) => matches(r, ast?.where)); + const skip = Number(ast?.skip ?? ast?.offset ?? 0) || 0; + const limit = ast?.limit ?? ast?.top; + return limit != null ? rows.slice(skip, skip + Number(limit)) : rows.slice(skip); + }, + findStream() { throw new Error('ns'); }, + async findOne(o: string, ast: any) { for (const r of storeFor(o).values()) if (matches(r, ast?.where)) return r; return null; }, + async create(o: string, data: Record) { + nextId += 1; const id = (data.id as string) ?? `r_${nextId}`; const row = { ...data, id }; storeFor(o).set(id, row); return row; + }, + async update(o: string, id: string, data: Record) { + const s = storeFor(o); const cur = s.get(id); if (!cur) throw new Error(`nf ${o}/${id}`); + const up = { ...cur, ...data, id }; s.set(id, up); return up; + }, + async upsert(o: string, data: Record) { const id = data.id as string | undefined; return id && storeFor(o).has(id) ? this.update(o, id, data) : this.create(o, data); }, + async delete(o: string, id: string) { return storeFor(o).delete(id); }, + async count(o: string, ast: any) { return (await this.find(o, ast)).length; }, + async bulkCreate(o: string, rows: Record[]) { return Promise.all(rows.map((r) => this.create(o, r))); }, + async bulkUpdate() { return []; }, async bulkDelete() {}, + async beginTransaction() { return { commit: async () => {}, rollback: async () => {} }; }, async commit() {}, async rollback() {}, + }; + return { driver, stores }; +} + +const USER = { + name: 'user', label: 'User', systemFields: false, + fields: { + id: { name: 'id', type: 'text' as const, primaryKey: true }, + name: { name: 'name', type: 'text' as const, label: '姓名' }, + email: { name: 'email', type: 'email' as const, label: '邮箱' }, + }, +}; + +const TASK = { + name: 'task', label: 'Task', systemFields: false, + fields: { + id: { name: 'id', type: 'text' as const, primaryKey: true, label: 'ID' }, + title: { name: 'title', type: 'text' as const, label: '标题' }, + done: { name: 'done', type: 'boolean' as const, label: '完成' }, + priority: { + name: 'priority', type: 'select' as const, label: '优先级', + options: [{ label: '高', value: 'high' }, { label: '低', value: 'low' }], + }, + score: { name: 'score', type: 'number' as const, label: '分数' }, + due: { name: 'due', type: 'date' as const, label: '截止' }, + owner: { name: 'owner', type: 'lookup' as const, label: '负责人', reference: 'user', displayField: 'name' }, + }, +}; + +function createMockServer() { + const noop = () => {}; + return { get: noop, post: noop, put: noop, delete: noop, patch: noop, use: noop, listen: async () => {}, close: async () => {} }; +} + +function makeRes() { + const res: any = { + write: () => true, end: () => {}, + header: () => res, + status: (code: number) => { res._status = code; return res; }, + json: (body: any) => { res._json = body; return res; }, + }; + return res; +} + +async function boot() { + const { driver } = makeMemoryDriver(); + const engine = new ObjectQL(); + engine.registerDriver(driver, true); + await engine.init(); + engine.registry.registerObject(USER as any); + engine.registry.registerObject(TASK as any); + await engine.insert('user', { id: 'u1', name: '张三', email: 'zhang@x.com' }); + await engine.insert('user', { id: 'u2', name: '李四', email: 'li@x.com' }); + + const protocol = new ObjectStackProtocolImplementation(engine as any); + const rest = new RestServer(createMockServer() as any, protocol as any); + rest.registerRoutes(); + const route = rest.getRoutes().find( + (r: any) => r.method === 'POST' && r.path === '/api/v1/data/:object/import', + ); + return { engine, protocol, route }; +} + +const call = (route: any, body: any) => { + const res = makeRes(); + return route.handler({ params: { object: 'task' }, body } as any, res).then(() => res); +}; + +describe('import route — real engine + protocol integration', () => { + let route: any; + let engine: any; + + beforeEach(async () => { + ({ route, engine } = await boot()); + expect(route).toBeDefined(); + }); + + it('coerces every special value type on insert (是→true, 高→high, name→id, date→ISO)', async () => { + const csv = [ + 'ID,标题,完成,优先级,分数,截止,负责人', + '1,写代码,是,高,"1,200",2026/06/30,张三', + ].join('\n'); + const res = await call(route, { + format: 'csv', csv, + mapping: { ID: 'id', 标题: 'title', 完成: 'done', 优先级: 'priority', 分数: 'score', 截止: 'due', 负责人: 'owner' }, + }); + expect(res._json).toMatchObject({ total: 1, ok: 1, errors: 0, created: 1 }); + const stored = await engine.findOne('task', { where: { id: '1' } }); + expect(stored).toMatchObject({ + title: '写代码', done: true, priority: 'high', score: 1200, owner: 'u1', + }); + expect(String(stored.due)).toContain('2026-06-30'); + }); + + it('resolves a lookup by email when displayField is not the match, and reports not-found', async () => { + // Default candidate fields include email — resolve 李四 via email. + const res = await call(route, { + format: 'json', + rows: [ + { id: 'a', title: 'x', owner: 'li@x.com' }, + { id: 'b', title: 'y', owner: '查无此人' }, + ], + }); + expect(res._json.created).toBe(1); + expect(res._json.errors).toBe(1); + const failed = res._json.results.find((r: any) => !r.ok); + expect(failed).toMatchObject({ field: 'owner', code: 'reference_not_found' }); + const a = await engine.findOne('task', { where: { id: 'a' } }); + expect(a.owner).toBe('u2'); + }); + + it('reports reference_ambiguous when a name matches more than one record', async () => { + // Second 张三 makes the name non-unique; the importer must refuse to guess. + await engine.insert('user', { id: 'u3', name: '张三', email: 'zhang2@x.com' }); + const res = await call(route, { + format: 'json', + rows: [ + { id: 'a', title: 'x', owner: '张三' }, // ambiguous name + { id: 'b', title: 'y', owner: 'zhang2@x.com' }, // unique email → resolves + ], + }); + expect(res._json.errors).toBe(1); + expect(res._json.results.find((r: any) => !r.ok)).toMatchObject({ field: 'owner', code: 'reference_ambiguous' }); + const b = await engine.findOne('task', { where: { id: 'b' } }); + expect(b.owner).toBe('u3'); + }); + + it('accepts a pasted record id directly via the id fast-path', async () => { + const res = await call(route, { + format: 'json', + rows: [{ id: 'c', title: 'z', owner: 'u1' }], + }); + expect(res._json.errors).toBe(0); + const c = await engine.findOne('task', { where: { id: 'c' } }); + expect(c.owner).toBe('u1'); + }); + + it('surfaces per-row coercion errors without aborting the batch', async () => { + const res = await call(route, { + format: 'json', + rows: [ + { id: 'ok', title: 'fine', score: '42' }, + { id: 'bad', title: 'nope', score: 'not-a-number' }, + ], + }); + expect(res._json.ok).toBe(1); + expect(res._json.errors).toBe(1); + expect(res._json.results.find((r: any) => !r.ok)).toMatchObject({ field: 'score', code: 'invalid_number' }); + }); + + it('dryRun coerces + previews create/update without persisting', async () => { + const res = await call(route, { + format: 'json', dryRun: true, + rows: [{ id: 'z', title: 'preview', done: '否' }], + }); + expect(res._json).toMatchObject({ dryRun: true, ok: 1, created: 1 }); + expect(await engine.findOne('task', { where: { id: 'z' } })).toBeNull(); + }); + + it('writeMode:update touches an existing match and skips non-matches', async () => { + await engine.insert('task', { id: '100', title: 'old', score: 1 }); + const res = await call(route, { + format: 'json', writeMode: 'update', matchFields: ['id'], + rows: [ + { id: '100', title: 'new', score: 2 }, // matches → update + { id: '999', title: 'ghost' }, // no match → skip + ], + }); + expect(res._json).toMatchObject({ updated: 1, skipped: 1, created: 0 }); + const row = await engine.findOne('task', { where: { id: '100' } }); + expect(row).toMatchObject({ title: 'new', score: 2 }); + expect(await engine.findOne('task', { where: { id: '999' } })).toBeNull(); + }); + + it('writeMode:upsert updates a match by a non-id field, else creates', async () => { + await engine.insert('task', { id: '200', title: 'Acme', score: 1 }); + const res = await call(route, { + format: 'json', writeMode: 'upsert', matchFields: ['title'], + rows: [ + { title: 'Acme', score: 9 }, // matches by title → update + { title: 'Umbrella', score: 5 }, // no match → create + ], + }); + expect(res._json).toMatchObject({ updated: 1, created: 1 }); + const acme = await engine.findOne('task', { where: { title: 'Acme' } }); + expect(acme).toMatchObject({ id: '200', score: 9 }); + const umbrella = await engine.findOne('task', { where: { title: 'Umbrella' } }); + expect(umbrella?.score).toBe(5); + }); + + it('rejects update/upsert without matchFields', async () => { + const res = await call(route, { format: 'json', writeMode: 'upsert', rows: [{ title: 'x' }] }); + expect(res._status).toBe(400); + expect(res._json.code).toBe('INVALID_REQUEST'); + }); + + it('parses a native xlsx workbook server-side and coerces cells like csv', async () => { + const ExcelJS: any = (await import('exceljs')).default ?? (await import('exceljs')); + const wb = new ExcelJS.Workbook(); + const ws = wb.addWorksheet('Sheet1'); + ws.addRow(['ID', '标题', '完成', '优先级', '分数', '截止', '负责人']); + ws.addRow(['1', '写代码', '是', '高', 1200, new Date('2026-06-30T00:00:00Z'), '张三']); + ws.addRow(['2', '测试', '否', '低', 3, '2026/07/01', '李四']); + const buf = await wb.xlsx.writeBuffer(); + const xlsxBase64 = Buffer.from(buf).toString('base64'); + + const res = await call(route, { + format: 'xlsx', xlsxBase64, + mapping: { ID: 'id', 标题: 'title', 完成: 'done', 优先级: 'priority', 分数: 'score', 截止: 'due', 负责人: 'owner' }, + }); + expect(res._json).toMatchObject({ total: 2, ok: 2, errors: 0, created: 2 }); + const one = await engine.findOne('task', { where: { id: '1' } }); + expect(one).toMatchObject({ title: '写代码', done: true, priority: 'high', score: 1200, owner: 'u1' }); + expect(String(one.due)).toContain('2026-06-30'); + const two = await engine.findOne('task', { where: { id: '2' } }); + expect(two).toMatchObject({ title: '测试', done: false, priority: 'low', score: 3, owner: 'u2' }); + }); + + it('reads xlsxBase64 without an explicit format and honors the sheet selector', async () => { + const ExcelJS: any = (await import('exceljs')).default ?? (await import('exceljs')); + const wb = new ExcelJS.Workbook(); + wb.addWorksheet('Empty'); // decoy first sheet + const ws = wb.addWorksheet('Data'); + ws.addRow(['id', 'title', 'score']); + ws.addRow(['x1', 'from-named-sheet', 7]); + const buf = await wb.xlsx.writeBuffer(); + const xlsxBase64 = Buffer.from(buf).toString('base64'); + + const res = await call(route, { xlsxBase64, sheet: 'Data' }); + expect(res._json).toMatchObject({ total: 1, ok: 1, created: 1 }); + const row = await engine.findOne('task', { where: { id: 'x1' } }); + expect(row).toMatchObject({ title: 'from-named-sheet', score: 7 }); + }); + + it('rejects a malformed xlsx payload with 400', async () => { + const res = await call(route, { format: 'xlsx', xlsxBase64: Buffer.from('not a workbook').toString('base64') }); + expect(res._status).toBe(400); + expect(res._json.code).toBe('INVALID_REQUEST'); + expect(String(res._json.error)).toMatch(/xlsx/i); + }); +}); diff --git a/packages/rest/src/import-job-integration.test.ts b/packages/rest/src/import-job-integration.test.ts new file mode 100644 index 0000000000..eadc62bc48 --- /dev/null +++ b/packages/rest/src/import-job-integration.test.ts @@ -0,0 +1,299 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * End-to-end async import-job integration: the REAL create / progress / results + * / list / cancel routes driven by a REAL {@link ObjectQL} engine + + * {@link ObjectStackProtocolImplementation}, an in-memory driver, and real + * registered objects (including a `sys_import_job` mirror) — no protocol mocks. + * + * Proves the P1 async pipeline: a create request persists a job row and returns + * immediately; the background worker streams the batch through the SAME shared + * runner the sync route uses, updating progress on the row; and readers can poll + * progress, fetch a capped results report, and list history. + */ + +import { describe, it, expect, beforeEach } from 'vitest'; +import { ObjectQL, ObjectStackProtocolImplementation } from '@objectstack/objectql'; +import { RestServer } from './rest-server'; + +// In-memory driver — equality + `$in`, with skip/limit (mirrors import-integration). +function makeMemoryDriver() { + const stores = new Map>>(); + const storeFor = (o: string) => { + let s = stores.get(o); + if (!s) { s = new Map(); stores.set(o, s); } + return s; + }; + let nextId = 0; + const matchOne = (cell: unknown, cond: unknown): boolean => { + if (cond && typeof cond === 'object' && !Array.isArray(cond)) { + const c = cond as Record; + if ('$in' in c) return Array.isArray(c.$in) && c.$in.some((x) => (cell ?? null) === (x ?? null)); + if ('$eq' in c) return (cell ?? null) === ((c.$eq as unknown) ?? null); + if ('$ne' in c) return (cell ?? null) !== ((c.$ne as unknown) ?? null); + } + return (cell ?? null) === ((cond as unknown) ?? null); + }; + const matches = (row: Record, where: any): boolean => { + if (!where || typeof where !== 'object') return true; + for (const [k, v] of Object.entries(where)) { + if (k.startsWith('$')) continue; + if (!matchOne(row[k], v)) return false; + } + return true; + }; + const driver: any = { + name: 'memory', version: '0.0.0', supports: {}, + async connect() {}, async disconnect() {}, async checkHealth() { return true; }, async execute() { return null; }, + async find(o: string, ast: any) { + const rows = Array.from(storeFor(o).values()).filter((r) => matches(r, ast?.where)); + const skip = Number(ast?.skip ?? ast?.offset ?? 0) || 0; + const limit = ast?.limit ?? ast?.top; + return limit != null ? rows.slice(skip, skip + Number(limit)) : rows.slice(skip); + }, + findStream() { throw new Error('ns'); }, + async findOne(o: string, ast: any) { for (const r of storeFor(o).values()) if (matches(r, ast?.where)) return r; return null; }, + async create(o: string, data: Record) { + nextId += 1; const id = (data.id as string) ?? `r_${nextId}`; const row = { ...data, id }; storeFor(o).set(id, row); return row; + }, + async update(o: string, id: string, data: Record) { + const s = storeFor(o); const cur = s.get(id); if (!cur) throw new Error(`nf ${o}/${id}`); + const up = { ...cur, ...data, id }; s.set(id, up); return up; + }, + async upsert(o: string, data: Record) { const id = data.id as string | undefined; return id && storeFor(o).has(id) ? this.update(o, id, data) : this.create(o, data); }, + async delete(o: string, id: string) { return storeFor(o).delete(id); }, + async count(o: string, ast: any) { return (await this.find(o, ast)).length; }, + async bulkCreate(o: string, rows: Record[]) { return Promise.all(rows.map((r) => this.create(o, r))); }, + async bulkUpdate() { return []; }, async bulkDelete() {}, + async beginTransaction() { return { commit: async () => {}, rollback: async () => {} }; }, async commit() {}, async rollback() {}, + }; + return { driver, stores }; +} + +const TASK = { + name: 'task', label: 'Task', systemFields: false, + fields: { + id: { name: 'id', type: 'text' as const, primaryKey: true, label: 'ID' }, + title: { name: 'title', type: 'text' as const, label: '标题' }, + done: { name: 'done', type: 'boolean' as const, label: '完成' }, + score: { name: 'score', type: 'number' as const, label: '分数' }, + }, +}; + +// Minimal sys_import_job mirror the routes read/write through the protocol. +const SYS_IMPORT_JOB = { + name: 'sys_import_job', label: 'Import Job', systemFields: false, + fields: { + id: { name: 'id', type: 'text' as const, primaryKey: true }, + object_name: { name: 'object_name', type: 'text' as const }, + status: { name: 'status', type: 'text' as const }, + total_rows: { name: 'total_rows', type: 'number' as const }, + processed_rows: { name: 'processed_rows', type: 'number' as const }, + created_count: { name: 'created_count', type: 'number' as const }, + updated_count: { name: 'updated_count', type: 'number' as const }, + skipped_count: { name: 'skipped_count', type: 'number' as const }, + error_count: { name: 'error_count', type: 'number' as const }, + write_mode: { name: 'write_mode', type: 'text' as const }, + dry_run: { name: 'dry_run', type: 'boolean' as const }, + run_automations: { name: 'run_automations', type: 'boolean' as const }, + error: { name: 'error', type: 'textarea' as const }, + results: { name: 'results', type: 'json' as const }, + started_at: { name: 'started_at', type: 'text' as const }, + completed_at: { name: 'completed_at', type: 'text' as const }, + created_by: { name: 'created_by', type: 'text' as const }, + created_at: { name: 'created_at', type: 'text' as const }, + }, +}; + +function createMockServer() { + const noop = () => {}; + return { get: noop, post: noop, put: noop, delete: noop, patch: noop, use: noop, listen: async () => {}, close: async () => {} }; +} + +function makeRes() { + const res: any = { + write: () => true, end: () => {}, + header: () => res, + status: (code: number) => { res._status = code; return res; }, + json: (body: any) => { res._json = body; return res; }, + }; + return res; +} + +async function boot() { + const { driver } = makeMemoryDriver(); + const engine = new ObjectQL(); + engine.registerDriver(driver, true); + await engine.init(); + engine.registry.registerObject(TASK as any); + engine.registry.registerObject(SYS_IMPORT_JOB as any); + + const protocol = new ObjectStackProtocolImplementation(engine as any); + const rest = new RestServer(createMockServer() as any, protocol as any); + rest.registerRoutes(); + const routes = rest.getRoutes(); + const find = (method: string, path: string) => routes.find((r: any) => r.method === method && r.path === path); + return { + engine, protocol, + create: find('POST', '/api/v1/data/:object/import/jobs'), + progress: find('GET', '/api/v1/data/import/jobs/:jobId'), + results: find('GET', '/api/v1/data/import/jobs/:jobId/results'), + list: find('GET', '/api/v1/data/import/jobs'), + cancel: find('POST', '/api/v1/data/import/jobs/:jobId/cancel'), + undo: find('POST', '/api/v1/data/import/jobs/:jobId/undo'), + }; +} + +const callCreate = (route: any, body: any) => { + const res = makeRes(); + return route.handler({ params: { object: 'task' }, body } as any, res).then(() => res); +}; +const callJob = (route: any, jobId: string, query: any = {}) => { + const res = makeRes(); + return route.handler({ params: { jobId }, query } as any, res).then(() => res); +}; + +/** Poll the progress route until the job reaches a terminal state. */ +async function waitForTerminal(progress: any, jobId: string, tries = 100): Promise { + for (let i = 0; i < tries; i++) { + const res = await callJob(progress, jobId); + const status = res._json?.status; + if (status === 'succeeded' || status === 'failed' || status === 'cancelled') return res._json; + await new Promise((r) => setTimeout(r, 5)); + } + throw new Error(`import job ${jobId} did not finish`); +} + +describe('async import job — real engine + protocol integration', () => { + let ctx: Awaited>; + beforeEach(async () => { + ctx = await boot(); + expect(ctx.create).toBeDefined(); + expect(ctx.progress && ctx.results && ctx.list && ctx.cancel && ctx.undo).toBeTruthy(); + }); + + it('creates a job, processes it in the background, and persists rows', async () => { + const res = await callCreate(ctx.create, { + format: 'json', + rows: [ + { id: 'a', title: 'one', done: '是', score: '10' }, + { id: 'b', title: 'two', done: '否', score: '20' }, + { id: 'c', title: 'three', score: 'not-a-number' }, // one failure + ], + }); + expect(res._status).toBe(201); + expect(res._json).toMatchObject({ object: 'task', status: 'pending', total: 3 }); + const jobId = res._json.jobId; + expect(jobId).toMatch(/^imp_/); + + const done = await waitForTerminal(ctx.progress, jobId); + expect(done).toMatchObject({ status: 'succeeded', total: 3, processed: 3, created: 2, errors: 1 }); + expect(done.percentComplete).toBe(100); + + // Records really landed (coerced: 是→true, "10"→10). + const a = await ctx.engine.findOne('task', { where: { id: 'a' } }); + expect(a).toMatchObject({ title: 'one', done: true, score: 10 }); + + // Results route returns the capped per-row report (failure present). + const results = await callJob(ctx.results, jobId); + expect(results._json.resultsTruncated).toBe(false); + expect(results._json.results.find((r: any) => !r.ok)).toMatchObject({ field: 'score', code: 'invalid_number' }); + }); + + it('rejects a payload above the 50k async ceiling with 413', async () => { + const rows = Array.from({ length: 50_001 }, (_, i) => ({ id: `x${i}`, title: 't' })); + const res = await callCreate(ctx.create, { format: 'json', rows }); + expect(res._status).toBe(413); + expect(res._json.code).toBe('PAYLOAD_TOO_LARGE'); + expect(String(res._json.error)).toMatch(/50000/); + }); + + it('lists jobs in history and filters by status', async () => { + const r1 = await callCreate(ctx.create, { format: 'json', rows: [{ id: 'l1', title: 'a' }] }); + await waitForTerminal(ctx.progress, r1._json.jobId); + const r2 = await callCreate(ctx.create, { format: 'json', rows: [{ id: 'l2', title: 'b' }] }); + await waitForTerminal(ctx.progress, r2._json.jobId); + + const all = await callJob(ctx.list, '', {}); + expect(all._json.jobs.length).toBe(2); + expect(all._json.jobs[0]).toHaveProperty('jobId'); + expect(all._json.jobs[0]).toHaveProperty('createdAt'); + + const succeeded = await callJob(ctx.list, '', { status: 'succeeded' }); + expect(succeeded._json.jobs.length).toBe(2); + const failedOnly = await callJob(ctx.list, '', { status: 'failed' }); + expect(failedOnly._json.jobs.length).toBe(0); + }); + + it('404s progress/results/cancel for an unknown job id', async () => { + const p = await callJob(ctx.progress, 'imp_nope'); + expect(p._status).toBe(404); + const r = await callJob(ctx.results, 'imp_nope'); + expect(r._status).toBe(404); + const c = await callJob(ctx.cancel, 'imp_nope'); + expect(c._status).toBe(404); + }); + + it('cancel on an already-finished job is a no-op success', async () => { + const created = await callCreate(ctx.create, { format: 'json', rows: [{ id: 'k', title: 'x' }] }); + const jobId = created._json.jobId; + await waitForTerminal(ctx.progress, jobId); + const c = await callJob(ctx.cancel, jobId); + expect(c._json).toMatchObject({ success: true }); + // Terminal state preserved. + const after = await callJob(ctx.progress, jobId); + expect(after._json.status).toBe('succeeded'); + }); + + it('undoes a job: deletes created records and restores updated ones', async () => { + // Seed one existing record so an upsert both creates and updates. + await ctx.protocol.createData({ object: 'task', data: { id: 'u_existing', title: 'old title', score: 1 } }); + + const created = await callCreate(ctx.create, { + format: 'json', writeMode: 'upsert', matchFields: ['id'], + rows: [ + { id: 'u_existing', title: 'new title', score: 99 }, // update + { id: 'u_new1', title: 'fresh one' }, // create + { id: 'u_new2', title: 'fresh two' }, // create + ], + }); + const jobId = created._json.jobId; + const done = await waitForTerminal(ctx.progress, jobId); + expect(done).toMatchObject({ status: 'succeeded', created: 2, updated: 1 }); + expect(done.undoable).toBe(true); + + // Writes really landed. + expect(await ctx.engine.findOne('task', { where: { id: 'u_new1' } })).toBeTruthy(); + expect(await ctx.engine.findOne('task', { where: { id: 'u_existing' } })).toMatchObject({ title: 'new title', score: 99 }); + + // Undo. + const u = await callJob(ctx.undo, jobId); + expect(u._json).toMatchObject({ success: true, deleted: 2, restored: 1, failed: 0 }); + + // Created records are gone; the updated record is back to its pre-import values. + expect(await ctx.engine.findOne('task', { where: { id: 'u_new1' } })).toBeFalsy(); + expect(await ctx.engine.findOne('task', { where: { id: 'u_new2' } })).toBeFalsy(); + expect(await ctx.engine.findOne('task', { where: { id: 'u_existing' } })).toMatchObject({ title: 'old title', score: 1 }); + + // Job now flags as reverted + no longer undoable. + const after = await callJob(ctx.progress, jobId); + expect(after._json.undoable).toBe(false); + expect(after._json.revertedAt).toBeTruthy(); + }); + + it('undoing twice is rejected (409 already reverted)', async () => { + const created = await callCreate(ctx.create, { format: 'json', rows: [{ id: 'uu1', title: 'x' }] }); + const jobId = created._json.jobId; + await waitForTerminal(ctx.progress, jobId); + const first = await callJob(ctx.undo, jobId); + expect(first._json.success).toBe(true); + const second = await callJob(ctx.undo, jobId); + expect(second._status).toBe(409); + expect(second._json.code).toBe('ALREADY_REVERTED'); + }); + + it('404s undo for an unknown job id', async () => { + const res = await callJob(ctx.undo, 'imp_nope'); + expect(res._status).toBe(404); + }); +}); diff --git a/packages/rest/src/import-runner.ts b/packages/rest/src/import-runner.ts new file mode 100644 index 0000000000..f6b343fcef --- /dev/null +++ b/packages/rest/src/import-runner.ts @@ -0,0 +1,281 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { coerceRow, type RefResolver, type RefMatch } from './import-coerce.js'; +import type { ExportFieldMeta } from './export-format.js'; + +/** + * import-runner — the shared row-processing core for bulk import. + * + * Both the synchronous `POST /data/:object/import` route and the asynchronous + * import-job worker feed rows through {@link runImport}. Extracting the loop + * keeps the two paths byte-for-byte identical in coercion, upsert matching, and + * per-row reporting — the async worker only adds progress persistence and + * cancellation on top. + */ + +export type ImportAction = 'created' | 'updated' | 'skipped' | 'failed'; + +export interface ImportRowResult { + row: number; + ok: boolean; + action: ImportAction; + id?: string; + field?: string; + error?: string; + code?: string; +} + +/** Running tallies handed to {@link RunImportOptions.onProgress}. */ +export interface ImportProgress { + processed: number; + total: number; + created: number; + updated: number; + skipped: number; + errors: number; +} + +/** + * Records exactly what a non-dry-run import changed, so the job can be undone: + * created records are deleted, and updated records have the touched fields + * restored to their pre-import values. Only the fields the import wrote are + * captured (keyed to `before`), keeping the log precise and bounded. + */ +export interface ImportUndoLog { + /** Ids of records this import created (delete to undo). */ + created: string[]; + /** Per updated record: the touched fields' values *before* the import. */ + updated: Array<{ id: string; before: Record }>; +} + +export interface ImportRunSummary extends ImportProgress { + ok: number; + results: ImportRowResult[]; + cancelled: boolean; + /** Present only when `captureUndo` was set — the reversal instructions. */ + undoLog?: ImportUndoLog; +} + +/** Minimal protocol surface the runner needs (find / create / update). */ +export interface ImportProtocolLike { + findData(args: any): Promise; + createData(args: any): Promise; + updateData(args: any): Promise; +} + +export interface RunImportOptions { + /** Protocol/engine to read & write through. */ + p: ImportProtocolLike; + objectName: string; + environmentId?: string; + /** Exec context threaded onto reads and (with automation toggle) writes. */ + context?: any; + /** Already-mapped rows (source columns renamed to target fields). */ + rows: Array>; + /** Field metadata for value coercion (name→id lookups, select codes, …). */ + metaMap: Map; + writeMode: 'insert' | 'update' | 'upsert'; + matchFields: string[]; + dryRun: boolean; + runAutomations: boolean; + trimWhitespace: boolean; + nullValues?: string[]; + createMissingOptions: boolean; + skipBlankMatchKey: boolean; + /** + * Progress callback, invoked every {@link RunImportOptions.progressEvery} + * processed rows and once at the end. May be async; the runner awaits it so a + * DB write of progress completes before the next chunk. + */ + onProgress?: (p: ImportProgress) => void | Promise; + /** Rows between onProgress calls (default 200). */ + progressEvery?: number; + /** + * Cooperative cancellation. Checked at each progress boundary; when it returns + * truthy the runner stops and returns `cancelled: true` with partial results. + */ + shouldCancel?: () => boolean | Promise; + /** + * When true (and not a dry run), accumulate an {@link ImportUndoLog} so the + * import can be reverted later. Callers gate this on row count to bound the + * stored snapshot size. + */ + captureUndo?: boolean; +} + +export function runImport(opts: RunImportOptions): Promise { + const { + p, objectName, environmentId, context, rows, metaMap, + writeMode, matchFields, dryRun, runAutomations, + trimWhitespace, nullValues, createMissingOptions, skipBlankMatchKey, + onProgress, shouldCancel, captureUndo, + } = opts; + const collectUndo = !!captureUndo && !dryRun; + const undoLog: ImportUndoLog = { created: [], updated: [] }; + // Snapshot only the fields the import touched, so undo restores exactly what + // changed. A field absent before the import is recorded as null → undo clears + // it. Never captured on dry runs (nothing was written). + const captureBefore = (before: Record, written: Record): Record => { + const snap: Record = {}; + for (const k of Object.keys(written)) snap[k] = before[k] ?? null; + return snap; + }; + const progressEvery = Math.max(1, opts.progressEvery ?? 200); + + const findRows = (r: any): any[] => + Array.isArray(r?.records) ? r.records + : Array.isArray(r?.data) ? r.data + : Array.isArray(r?.rows) ? r.rows + : Array.isArray(r) ? r : []; + const findArgsBase = (query: any) => ({ + object: '', + query, + ...(environmentId ? { environmentId } : {}), + ...(context ? { context } : {}), + }); + + // Reference resolver: name/email/id → referenced record id. Cached per + // (object, display) so a name repeated across rows costs one query. + const refCache = new Map(); + const resolveRef: RefResolver = async (referenceObject, display, meta) => { + const cacheKey = `${referenceObject}::${display}`; + const cached = refCache.get(cacheKey); + if (cached) return cached; + // Try an exact id first (authoritative + unique when the user pasted an id), + // then the configured display field, then the usual human identifiers. + // De-dupe so a field isn't queried twice. The first candidate field to match + // wins; if that field matches >1 record we stop and report ambiguity rather + // than silently linking the first. + const candidates = [...new Set([ + 'id', + ...(meta.displayField ? [meta.displayField] : []), + 'name', 'title', 'label', 'full_name', 'email', 'username', + ])]; + let match: RefMatch = {}; + for (const f of candidates) { + try { + const r = await p.findData({ + ...findArgsBase({ $filter: { [f]: display }, $top: 2 }), + object: referenceObject, + }); + const recs = findRows(r); + if (recs.length === 0) continue; + if (recs.length > 1) { match = { ambiguous: true, matchedField: f }; break; } + if (recs[0]?.id != null) { match = { id: String(recs[0].id), matchedField: f }; break; } + } catch { /* field absent on target object — try the next candidate */ } + } + refCache.set(cacheKey, match); + return match; + }; + + // Locate an existing record for update/upsert by matchFields. Returns the + // record, or a sentinel: 'blank' (a match field was empty), 'none' (no + // match), 'ambiguous' (>1 match — too risky to update). + const findExisting = async ( + data: Record, + ): Promise | 'blank' | 'none' | 'ambiguous'> => { + const filter: Record = {}; + for (const f of matchFields) { + const v = data[f]; + if (v === undefined || v === null || v === '') return 'blank'; + filter[f] = v; + } + const r = await p.findData({ ...findArgsBase({ $filter: filter, $top: 2 }), object: objectName }); + const recs = findRows(r); + if (recs.length === 0) return 'none'; + if (recs.length > 1) return 'ambiguous'; + return recs[0]; + }; + + const writeCtx = { ...(context ?? {}), skipAutomations: !runAutomations }; + + const results: ImportRowResult[] = []; + let okCount = 0, errCount = 0, created = 0, updated = 0, skipped = 0; + let cancelled = false; + + const snapshot = (processed: number): ImportProgress => ({ + processed, total: rows.length, created, updated, skipped, errors: errCount, + }); + + return (async () => { + for (let i = 0; i < rows.length; i++) { + const rowNo = i + 1; + try { + // 1. Coerce every cell to its storage value (+ resolve lookups). + const { data, errors } = await coerceRow(rows[i], metaMap, { + trimWhitespace, nullValues, createMissingOptions, resolveRef, + }); + if (errors.length > 0) { + const first = errors[0]; + errCount++; + results.push({ row: rowNo, ok: false, action: 'failed', field: first.field, code: first.code, error: first.message }); + } else { + // 2. Decide create vs update vs skip. + let existing: Record | 'blank' | 'none' | 'ambiguous' = 'none'; + let handled = false; + if (writeMode !== 'insert') { + existing = await findExisting(data); + if (existing === 'ambiguous') { + errCount++; + results.push({ row: rowNo, ok: false, action: 'failed', code: 'AMBIGUOUS_MATCH', error: `matchFields matched more than one ${objectName} record` }); + handled = true; + } else if (existing === 'blank' && (skipBlankMatchKey || writeMode === 'update')) { + // Blank match key: skip when asked, else fall through to create. + skipped++; + results.push({ row: rowNo, ok: true, action: 'skipped', code: 'BLANK_MATCH_KEY' }); + handled = true; + } + } + + if (!handled) { + const willUpdate = existing && typeof existing === 'object'; + const willCreate = !willUpdate && (writeMode === 'insert' || writeMode === 'upsert'); + + if (!willUpdate && !willCreate) { + // update mode, no match → skip. + skipped++; + results.push({ row: rowNo, ok: true, action: 'skipped', code: 'NO_MATCH' }); + } else if (dryRun) { + okCount++; + if (willUpdate) { updated++; results.push({ row: rowNo, ok: true, action: 'updated', id: String((existing as any).id ?? '') || undefined }); } + else { created++; results.push({ row: rowNo, ok: true, action: 'created' }); } + } else if (willUpdate) { + const target = existing as Record; + const res2 = await p.updateData({ object: objectName, id: target.id, data, context: writeCtx, ...(environmentId ? { environmentId } : {}) }); + const id = (res2 as any)?.id ?? (res2 as any)?.record?.id ?? target.id; + okCount++; updated++; + if (collectUndo && target.id != null) { + undoLog.updated.push({ id: String(target.id), before: captureBefore(target, data) }); + } + results.push({ row: rowNo, ok: true, action: 'updated', id: id != null ? String(id) : undefined }); + } else { + const res2 = await p.createData({ object: objectName, data, context: writeCtx, ...(environmentId ? { environmentId } : {}) }); + const id = (res2 as any)?.id ?? (res2 as any)?.record?.id; + okCount++; created++; + if (collectUndo && id != null) undoLog.created.push(String(id)); + results.push({ row: rowNo, ok: true, action: 'created', id: id != null ? String(id) : undefined }); + } + } + } + } catch (err: any) { + errCount++; + const code = err?.code ?? 'IMPORT_ROW_FAILED'; + const message = typeof err?.message === 'string' ? err.message.slice(0, 300) : 'Row failed'; + results.push({ row: rowNo, ok: false, action: 'failed', error: message, code }); + } + + const processed = i + 1; + if (onProgress && (processed % progressEvery === 0 || processed === rows.length)) { + await onProgress(snapshot(processed)); + } + if (shouldCancel && processed < rows.length && (processed % progressEvery === 0)) { + if (await shouldCancel()) { cancelled = true; break; } + } + } + + return { + ...snapshot(results.length), ok: okCount, results, cancelled, + ...(collectUndo ? { undoLog } : {}), + }; + })(); +} diff --git a/packages/rest/src/rest-api-plugin.ts b/packages/rest/src/rest-api-plugin.ts index 0b297d9d90..3dfd8d8a90 100644 --- a/packages/rest/src/rest-api-plugin.ts +++ b/packages/rest/src/rest-api-plugin.ts @@ -6,6 +6,7 @@ import { ObjectStackProtocol, RestServerConfig } from '@objectstack/spec/api'; import { registerPackageRoutes } from './package-routes.js'; import { registerExternalDatasourceRoutes } from './external-datasource-routes.js'; import type { PackageService } from '@objectstack/service-package'; +import { SysImportJob } from '@objectstack/platform-objects/audit'; export interface RestApiPluginConfig { serverServiceName?: string; @@ -32,8 +33,25 @@ export function createRestApiPlugin(config: RestApiPluginConfig = {}): Plugin { name: 'com.objectstack.rest.api', version: '1.0.0', - init: async (_ctx: PluginContext) => { - // No service registration, this is a consumer plugin + init: async (ctx: PluginContext) => { + // Register the async-import job object so its state/progress/history + // is queryable in Studio and readable by the import-job routes. + // The REST plugin owns the import feature, so it owns this object + // (there is no separate import service). Mirrors JobServicePlugin. + try { + ctx.getService<{ register(m: any): void }>('manifest').register({ + id: 'com.objectstack.rest.api', + name: 'REST API', + version: '1.0.0', + type: 'plugin', + scope: 'system', + defaultDatasource: 'cloud', + namespace: 'sys', + objects: [SysImportJob], + }); + } catch (err) { + ctx.logger.warn('RestApiPlugin: manifest service unavailable; sys_import_job not registered', err as any); + } }, start: async (ctx: PluginContext) => { diff --git a/packages/rest/src/rest-server.ts b/packages/rest/src/rest-server.ts index ab57f8cf84..65c497067e 100644 --- a/packages/rest/src/rest-server.ts +++ b/packages/rest/src/rest-server.ts @@ -12,6 +12,7 @@ import { formatRowForJson, type ExportFieldMeta, } from './export-format.js'; +import { runImport } from './import-runner.js'; // Node-safe logger — avoids importing 'console' which is absent from ES2020 lib typings. const logError = (...args: unknown[]) => (globalThis as any).console?.error(...args); @@ -381,6 +382,287 @@ function parseCsvToRows(csv: string, mapping: Record = {}): Arra return out; } +/** + * Flatten one ExcelJS cell value to the raw string the coercion layer expects. + * ExcelJS hands back rich objects for formulas / hyperlinks / rich text / dates; + * we reduce each to the human-visible text so a server-parsed xlsx yields the + * same cells a CSV export would (dates → ISO, so parseDateCell can re-read them). + */ +function xlsxCellToString(value: any): string { + if (value === null || value === undefined) return ''; + if (typeof value === 'string') return value; + if (typeof value === 'number' || typeof value === 'boolean' || typeof value === 'bigint') return String(value); + if (value instanceof Date) return value.toISOString(); + if (typeof value === 'object') { + // Formula cell → prefer its computed result. + if ('result' in value && value.result !== undefined && value.result !== null) return xlsxCellToString(value.result); + // Hyperlink cell → the visible text, not the target. + if ('text' in value && typeof value.text === 'string') return value.text; + if ('hyperlink' in value && typeof value.hyperlink === 'string') return value.hyperlink; + // Rich text → concatenate runs. + if (Array.isArray(value.richText)) return value.richText.map((r: any) => r?.text ?? '').join(''); + if ('error' in value && value.error) return String(value.error); + } + try { return String(value); } catch { return ''; } +} + +/** + * Parse an .xlsx workbook (raw bytes) into row objects, mirroring + * {@link parseCsvToRows}: first non-empty row is the header, each subsequent row + * becomes `{ header→cell }` with the optional `mapping` renaming columns. Reads + * the named/indexed `sheet` when given, else the first worksheet. Dynamically + * imports ExcelJS (already a dependency of the export path) so CSV/JSON imports + * don't pay for it. + */ +async function parseXlsxToRows( + buffer: Buffer | ArrayBuffer, + mapping: Record = {}, + sheet?: string | number, +): Promise>> { + const ExcelJS: any = (await import('exceljs')).default ?? (await import('exceljs')); + const wb = new ExcelJS.Workbook(); + await wb.xlsx.load(buffer); + const ws = sheet !== undefined ? wb.getWorksheet(sheet as any) : wb.worksheets[0]; + if (!ws) return []; + + const cells: string[][] = []; + ws.eachRow({ includeEmpty: false }, (row: any) => { + const values = row.values as any[]; // 1-based; index 0 is unused + const line: string[] = []; + for (let c = 1; c < values.length; c++) line.push(xlsxCellToString(values[c])); + cells.push(line); + }); + while (cells.length > 0 && cells[cells.length - 1].every(c => c === '')) cells.pop(); + if (cells.length < 2) return []; + + const header = cells[0].map(h => h.trim()); + const fields = header.map(h => mapping[h] ?? h); + const out: Array> = []; + for (let r = 1; r < cells.length; r++) { + const line = cells[r]; + const obj: Record = {}; + for (let c = 0; c < fields.length; c++) { + const key = fields[c]; + if (!key) continue; + obj[key] = line[c] ?? ''; + } + out.push(obj); + } + return out; +} + +/** Everything the import runner needs, parsed & validated from a request body. */ +interface PreparedImport { + rows: Array>; + metaMap: Map; + writeMode: 'insert' | 'update' | 'upsert'; + matchFields: string[]; + dryRun: boolean; + runAutomations: boolean; + trimWhitespace: boolean; + nullValues?: string[]; + createMissingOptions: boolean; + skipBlankMatchKey: boolean; +} + +type PrepareImportResult = + | { ok: true; prepared: PreparedImport } + | { ok: false; status: number; code: string; error: string }; + +/** + * Parse & validate a bulk-import request body into a {@link PreparedImport}. + * + * Shared by the synchronous `POST /data/:object/import` route and the async + * import-job create route so both accept byte-identical payloads (writeMode + + * matchFields, mapping in either shape, rows[]/csv/xlsxBase64) and resolve the + * same field metadata. The only knob that differs is `maxRows` (5k sync vs + * 50k async). Returns a discriminated result; the caller maps `!ok` to an HTTP + * error using the returned status/code/error. + */ +async function prepareImportRequest( + body: any, + opts: { p: any; objectName: string; environmentId?: string; maxRows: number }, +): Promise { + const { p, objectName, environmentId, maxRows } = opts; + const dryRun = body?.dryRun === true; + + const writeMode: 'insert' | 'update' | 'upsert' = + body?.writeMode === 'update' || body?.writeMode === 'upsert' ? body.writeMode : 'insert'; + const matchFields: string[] = Array.isArray(body?.matchFields) + ? body.matchFields.filter((f: any) => typeof f === 'string' && f.length > 0) + : []; + const runAutomations = body?.runAutomations === true; + const trimWhitespace = body?.trimWhitespace !== false; + const nullValues: string[] | undefined = Array.isArray(body?.nullValues) + ? body.nullValues.filter((v: any) => typeof v === 'string') + : undefined; + const createMissingOptions = body?.createMissingOptions === true; + const skipBlankMatchKey = body?.skipBlankMatchKey === true; + + if (writeMode !== 'insert' && matchFields.length === 0) { + return { ok: false, status: 400, code: 'INVALID_REQUEST', error: `writeMode "${writeMode}" requires a non-empty matchFields[]` }; + } + + // Normalize `mapping` to a `{ sourceColumn: targetField }` record. Accepts + // either that compact form or a FieldMappingEntry[]. + const mapping: Record = {}; + if (Array.isArray(body?.mapping)) { + for (const e of body.mapping) { + if (e && typeof e.sourceField === 'string' && typeof e.targetField === 'string') { + mapping[e.sourceField] = e.targetField; + } + } + } else if (body?.mapping && typeof body.mapping === 'object') { + for (const [k, v] of Object.entries(body.mapping)) { + if (typeof v === 'string') mapping[k] = v; + } + } + const applyMapping = (row: Record): Record => { + if (Object.keys(mapping).length === 0) return row; + const out: Record = {}; + for (const [k, val] of Object.entries(row)) out[mapping[k] ?? k] = val; + return out; + }; + + // Build rows[] from JSON array, CSV text, or a base64 xlsx. + let rows: Array> = []; + if (body?.format === 'json' && Array.isArray(body.rows)) { + rows = (body.rows as Array>).map(applyMapping); + } else if ((body?.format === 'csv' || typeof body?.csv === 'string') && typeof body?.csv === 'string') { + rows = parseCsvToRows(body.csv, mapping); + } else if ((body?.format === 'xlsx' || typeof body?.xlsxBase64 === 'string') && typeof body?.xlsxBase64 === 'string') { + // Native server-side xlsx parse — the client uploads raw workbook bytes + // (base64) instead of pre-flattening to CSV. + try { + const buf = Buffer.from(body.xlsxBase64, 'base64'); + rows = await parseXlsxToRows(buf, mapping, body.sheet); + } catch (e: any) { + return { ok: false, status: 400, code: 'INVALID_REQUEST', error: `Failed to parse xlsx: ${e?.message ?? String(e)}` }; + } + } else if (Array.isArray(body)) { + // Permissive: a bare JSON array at the top level. + rows = (body as Array>).map(applyMapping); + } else { + return { ok: false, status: 400, code: 'INVALID_REQUEST', error: 'Provide format:"csv" with csv text, format:"json" with rows[], or format:"xlsx" with xlsxBase64' }; + } + + if (rows.length > maxRows) { + return { ok: false, status: 413, code: 'PAYLOAD_TOO_LARGE', error: `Import limit is ${maxRows} rows per request (got ${rows.length}).` }; + } + + // Resolve the object's field metadata so cells coerce to storage values + // (booleans, numbers, dates→ISO, select label→code) and lookup names resolve + // to record ids. Best-effort: a failed lookup leaves `metaMap` empty and + // every value passes through untouched. + let metaMap = new Map(); + try { + let schema: any = undefined; + if (typeof p.getMetaItem === 'function') { + const r = await p.getMetaItem({ type: 'object', name: objectName }); + schema = isMetaEnvelope(r) ? r.item : r; + } + if (!schema && typeof p.getObjectSchema === 'function') { + schema = await p.getObjectSchema(objectName, environmentId); + } + metaMap = buildFieldMetaMap(schema); + } catch { /* pass-through coercion */ } + + return { + ok: true, + prepared: { + rows, metaMap, writeMode, matchFields, dryRun, runAutomations, + trimWhitespace, nullValues, createMissingOptions, skipBlankMatchKey, + }, + }; +} + +/** Platform object backing async import jobs (see sys-import-job.object.ts). */ +const IMPORT_JOB_OBJECT = 'sys_import_job'; +/** Hard ceiling on rows per async import job (mirrors spec IMPORT_JOB_MAX_ROWS). */ +const IMPORT_JOB_MAX_ROWS = 50_000; +/** Cap on per-row results persisted on the job (failures first). */ +const IMPORT_JOB_RESULTS_CAP = 500; +/** Undo (logical rollback) is only recorded for jobs at or under this row + * count — larger jobs skip the undo log to bound the stored before-snapshots. */ +const IMPORT_JOB_UNDO_MAX_ROWS = 5_000; + +/** Generate a sortable-ish, collision-resistant import job id. */ +function newImportJobId(): string { + return `imp_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 10)}`; +} + +/** Cap a results list to {@link IMPORT_JOB_RESULTS_CAP}, keeping failures first. */ +function capImportResults(results: Array<{ ok: boolean }>): { items: any[]; truncated: boolean } { + if (results.length <= IMPORT_JOB_RESULTS_CAP) return { items: results, truncated: false }; + const failures = results.filter(r => !r.ok); + const successes = results.filter(r => r.ok); + const items = [...failures, ...successes].slice(0, IMPORT_JOB_RESULTS_CAP); + return { items, truncated: true }; +} + +/** Parse the persisted undo log (json column may arrive as object or string). */ +function parseUndoLog(raw: any): { created: string[]; updated: Array<{ id: string; before: Record }> } | undefined { + if (!raw) return undefined; + let v = raw; + if (typeof v === 'string') { try { v = JSON.parse(v); } catch { return undefined; } } + if (!v || typeof v !== 'object') return undefined; + const created = Array.isArray(v.created) ? v.created.map(String) : []; + const updated = Array.isArray(v.updated) + ? v.updated.filter((u: any) => u && u.id != null).map((u: any) => ({ id: String(u.id), before: u.before ?? {} })) + : []; + return { created, updated }; +} + +/** True when a job can still be undone: it wrote data (undo log present with + * entries), hasn't already been reverted, and finished in a terminal state. */ +function importJobUndoable(row: any): boolean { + if (row?.reverted_at) return false; + const status = String(row?.status ?? ''); + if (status !== 'succeeded' && status !== 'cancelled') return false; + const log = parseUndoLog(row?.undo_log); + return !!log && (log.created.length > 0 || log.updated.length > 0); +} + +/** Map a persisted `sys_import_job` row to the ImportJobProgress DTO. */ +function importJobToProgress(row: any): Record { + const total = Number(row?.total_rows ?? 0); + const processed = Number(row?.processed_rows ?? 0); + return { + undoable: importJobUndoable(row), + ...(row?.reverted_at ? { revertedAt: String(row.reverted_at) } : {}), + jobId: String(row?.id ?? ''), + object: String(row?.object_name ?? ''), + status: String(row?.status ?? 'pending'), + dryRun: !!row?.dry_run, + writeMode: String(row?.write_mode ?? 'insert'), + total, + processed, + created: Number(row?.created_count ?? 0), + updated: Number(row?.updated_count ?? 0), + skipped: Number(row?.skipped_count ?? 0), + errors: Number(row?.error_count ?? 0), + percentComplete: total > 0 ? Math.min(100, Math.round((processed / total) * 100)) : (processed > 0 ? 100 : 0), + ...(row?.error ? { error: String(row.error) } : {}), + ...(row?.started_at ? { startedAt: String(row.started_at) } : {}), + ...(row?.completed_at ? { completedAt: String(row.completed_at) } : {}), + createdAt: String(row?.created_at ?? ''), + }; +} + +/** Map a persisted `sys_import_job` row to the ImportJobSummary DTO (list). */ +function importJobToSummary(row: any): Record { + const p = importJobToProgress(row); + return { + jobId: p.jobId, object: p.object, status: p.status, + total: p.total, processed: p.processed, + created: p.created, updated: p.updated, skipped: p.skipped, errors: p.errors, + createdAt: p.createdAt, + undoable: p.undoable, + ...(p.completedAt ? { completedAt: p.completedAt } : {}), + ...(p.revertedAt ? { revertedAt: p.revertedAt } : {}), + }; +} + /** * Escape a single value into an RFC-4180 CSV cell. Values containing * commas, quotes, CR, or LF are wrapped in double-quotes with embedded @@ -627,6 +909,13 @@ export class RestServer { * capability gates (ADR-0057 D10) — resolveExecCtx sets no kernel in * single-kernel deployments, so this prevents the gate failing open. */ private serviceExistsProvider?: (name: string) => boolean; + /** + * In-flight async import jobs the caller has asked to cancel. The worker + * checks membership at each progress boundary and stops cooperatively. This + * is process-local (single-node); the persisted `sys_import_job.status` is + * the durable source of truth a restarted/other node reads. + */ + private readonly cancelledImportJobs = new Set(); constructor( server: IHttpServer, @@ -3208,72 +3497,35 @@ export class RestServer { } if (await this.enforceApiAccess(req, res, p, environmentId, 'import')) return; const body = req.body ?? {}; - const dryRun = body.dryRun === true; - const mapping: Record = body.mapping ?? {}; - - // Build rows[] from either explicit JSON array or CSV text. - let rows: Array> = []; - if (body.format === 'json' && Array.isArray(body.rows)) { - rows = body.rows as Array>; - } else if ((body.format === 'csv' || typeof body.csv === 'string') && typeof body.csv === 'string') { - rows = parseCsvToRows(body.csv, mapping); - } else if (Array.isArray(body)) { - // Permissive: a bare JSON array at the top level. - rows = body as Array>; - } else { - res.status(400).json({ - code: 'INVALID_REQUEST', - error: 'Provide either format:"csv" with csv text or format:"json" with rows[]', - }); - return; - } - const max = 5000; - if (rows.length > max) { - res.status(413).json({ - code: 'PAYLOAD_TOO_LARGE', - error: `Import limit is ${max} rows per request (got ${rows.length})`, - }); + // Parse + validate the payload (shared with the async job route). + // The synchronous path caps at 5k rows; larger files must use the + // async import-job endpoint. + const prep = await prepareImportRequest(body, { p, objectName, environmentId, maxRows: 5000 }); + if (!prep.ok) { + if (prep.status === 413) prep.error += ' Use an async import job for larger files.'; + res.status(prep.status).json({ code: prep.code, error: prep.error }); return; } + const { rows, writeMode, dryRun } = prep.prepared; - const results: Array<{ row: number; ok: boolean; id?: string; error?: string; code?: string }> = []; - let okCount = 0; - let errCount = 0; - - for (let i = 0; i < rows.length; i++) { - const data = rows[i]; - try { - if (dryRun) { - // Validate via protocol's metadata layer when available, else - // best-effort: treat any non-empty row as syntactically OK. - const validate = (p as any).validate; - if (typeof validate === 'function') { - await validate.call(p, { object: objectName, data, context }); - } - results.push({ row: i + 1, ok: true }); - okCount++; - } else { - const created = await (p as any).createData({ object: objectName, data, context }); - const id = (created as any)?.id ?? (created as any)?.record?.id; - results.push({ row: i + 1, ok: true, id }); - okCount++; - } - } catch (err: any) { - errCount++; - const code = err?.code ?? 'IMPORT_ROW_FAILED'; - const message = typeof err?.message === 'string' ? err.message.slice(0, 300) : 'Row failed'; - results.push({ row: i + 1, ok: false, error: message, code }); - } - } + // Delegate the per-row coercion + upsert loop to the shared + // runner (also used by the async import-job worker). + const summary = await runImport({ + p, objectName, environmentId, context, ...prep.prepared, + }); res.json({ object: objectName, dryRun, + writeMode, total: rows.length, - ok: okCount, - errors: errCount, - results, + ok: summary.ok, + errors: summary.errors, + created: summary.created, + updated: summary.updated, + skipped: summary.skipped, + results: summary.results, }); } catch (error: any) { logError('[REST] Unhandled error:', error); @@ -3286,6 +3538,328 @@ export class RestServer { }, }); + // ── Asynchronous import jobs (P1) ────────────────────────────────── + // + // For files too large for the synchronous route (up to 50k rows), the + // client POSTs the whole payload once; the server persists a + // `sys_import_job`, responds immediately with a jobId, then processes + // the batch in the background — updating progress on the job row as it + // streams. Callers poll progress/results and list history. These routes + // are registered inside registerDataActionEndpoints (before the greedy + // CRUD `:object/:id`), so the literal `import/jobs` segments win. + + // POST /data/:object/import/jobs — create an async import job. + this.routeManager.register({ + method: 'POST', + path: `${dataPath}/:object/import/jobs`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const objectName = String(req.params.object || ''); + if (!objectName) { + res.status(400).json({ code: 'INVALID_REQUEST', error: 'object is required' }); + return; + } + if (await this.enforceApiAccess(req, res, p, environmentId, 'import')) return; + + const prep = await prepareImportRequest(req.body ?? {}, { p, objectName, environmentId, maxRows: IMPORT_JOB_MAX_ROWS }); + if (!prep.ok) { + if (prep.status === 413) prep.error += ` This is the async import ceiling; split the file into batches of ${IMPORT_JOB_MAX_ROWS}.`; + res.status(prep.status).json({ code: prep.code, error: prep.error }); + return; + } + const prepared = prep.prepared; + + const jobId = newImportJobId(); + const createdAt = new Date().toISOString(); + const createdBy = String((context as any)?.userId ?? (context as any)?.user?.id ?? '') || undefined; + const jobRow: Record = { + id: jobId, + object_name: objectName, + status: 'pending', + total_rows: prepared.rows.length, + processed_rows: 0, + created_count: 0, + updated_count: 0, + skipped_count: 0, + error_count: 0, + write_mode: prepared.writeMode, + dry_run: prepared.dryRun, + run_automations: prepared.runAutomations, + created_at: createdAt, + ...(createdBy ? { created_by: createdBy } : {}), + }; + + try { + await (p as any).createData({ object: IMPORT_JOB_OBJECT, data: jobRow, context, ...(environmentId ? { environmentId } : {}) }); + } catch (err: any) { + logError('[REST] Failed to persist import job:', err); + res.status(500).json({ code: 'IMPORT_JOB_CREATE_FAILED', error: 'Could not create import job' }); + return; + } + + // Respond immediately; process in the background. + res.status(201).json({ jobId, object: objectName, status: 'pending', total: prepared.rows.length, createdAt }); + + // Background worker. Fire-and-forget: it owns its own error + // handling and persists terminal state to the job row. + const patch = async (data: Record) => { + try { + await (p as any).updateData({ object: IMPORT_JOB_OBJECT, id: jobId, data, context, ...(environmentId ? { environmentId } : {}) }); + } catch (err) { + logError('[REST] import job progress write failed:', err); + } + }; + // Record undo instructions for small non-dry-run jobs so the + // import can be logically rolled back later. + const captureUndo = !prepared.dryRun && prepared.rows.length <= IMPORT_JOB_UNDO_MAX_ROWS; + void (async () => { + await patch({ status: 'running', started_at: new Date().toISOString() }); + try { + const summary = await runImport({ + p, objectName, environmentId, context, ...prepared, + captureUndo, + progressEvery: 200, + onProgress: (pr) => patch({ + processed_rows: pr.processed, + created_count: pr.created, + updated_count: pr.updated, + skipped_count: pr.skipped, + error_count: pr.errors, + }), + shouldCancel: () => this.cancelledImportJobs.has(jobId), + }); + await patch({ + status: summary.cancelled ? 'cancelled' : 'succeeded', + processed_rows: summary.processed, + created_count: summary.created, + updated_count: summary.updated, + skipped_count: summary.skipped, + error_count: summary.errors, + results: capImportResults(summary.results), + completed_at: new Date().toISOString(), + ...(summary.undoLog ? { undo_log: summary.undoLog } : {}), + }); + } catch (err: any) { + await patch({ + status: 'failed', + error: String(err?.message ?? err).slice(0, 1000), + completed_at: new Date().toISOString(), + }); + } finally { + this.cancelledImportJobs.delete(jobId); + } + })(); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, String(req.params?.object || '')); + } + }, + metadata: { + summary: 'Create an asynchronous import job (large files, up to 50k rows)', + tags: ['data', 'import'], + }, + }); + + // Shared loader for the read routes: fetch one job row by id. + const loadImportJob = async (p: any, jobId: string, environmentId?: string, context?: any): Promise => { + const r = await p.findData({ + object: IMPORT_JOB_OBJECT, + query: { $filter: { id: jobId }, $top: 1 }, + ...(environmentId ? { environmentId } : {}), + ...(context ? { context } : {}), + }); + const rows = Array.isArray(r?.records) ? r.records + : Array.isArray(r?.data) ? r.data + : Array.isArray(r?.rows) ? r.rows + : Array.isArray(r) ? r : []; + return rows[0]; + }; + + // POST /data/import/jobs/:jobId/cancel — request cancellation. + this.routeManager.register({ + method: 'POST', + path: `${dataPath}/import/jobs/:jobId/cancel`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const jobId = String(req.params.jobId || ''); + const row = await loadImportJob(p, jobId, environmentId, context); + if (!row) { + res.status(404).json({ code: 'NOT_FOUND', error: `No import job ${jobId}` }); + return; + } + const status = String(row.status ?? ''); + if (status === 'pending' || status === 'running') { + // Signal the in-process worker and mark the durable row. + this.cancelledImportJobs.add(jobId); + try { + await (p as any).updateData({ object: IMPORT_JOB_OBJECT, id: jobId, data: { status: 'cancelled', completed_at: new Date().toISOString() }, context, ...(environmentId ? { environmentId } : {}) }); + } catch { /* worker will still stop via the in-memory flag */ } + } + res.json({ success: true }); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, ''); + } + }, + metadata: { summary: 'Cancel an in-flight import job', tags: ['data', 'import'] }, + }); + + // POST /data/import/jobs/:jobId/undo — logical rollback of a finished + // job: delete the records it created and restore the fields it updated + // to their pre-import values (from the captured undo log). + this.routeManager.register({ + method: 'POST', + path: `${dataPath}/import/jobs/:jobId/undo`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const jobId = String(req.params.jobId || ''); + const row = await loadImportJob(p, jobId, environmentId, context); + if (!row) { + res.status(404).json({ code: 'NOT_FOUND', error: `No import job ${jobId}` }); + return; + } + if (row.reverted_at) { + res.status(409).json({ code: 'ALREADY_REVERTED', error: 'This import has already been undone' }); + return; + } + if (!importJobUndoable(row)) { + res.status(422).json({ code: 'NOT_UNDOABLE', error: 'This import cannot be undone (too large, still running, or nothing was written)' }); + return; + } + const objectName = String(row.object_name ?? ''); + const log = parseUndoLog(row.undo_log)!; + // Undo automations too: reversing writes shouldn't re-fire triggers. + const writeCtx = { ...(context ?? {}), skipAutomations: true }; + let deleted = 0, restored = 0, failed = 0; + + // Delete created records first (they didn't exist before). + for (const id of log.created) { + try { + await (p as any).deleteData({ object: objectName, id, context: writeCtx, ...(environmentId ? { environmentId } : {}) }); + deleted++; + } catch { failed++; } + } + // Restore the touched fields on updated records. + for (const u of log.updated) { + try { + await (p as any).updateData({ object: objectName, id: u.id, data: u.before, context: writeCtx, ...(environmentId ? { environmentId } : {}) }); + restored++; + } catch { failed++; } + } + + await (p as any).updateData({ + object: IMPORT_JOB_OBJECT, id: jobId, + data: { reverted_at: new Date().toISOString() }, + context, ...(environmentId ? { environmentId } : {}), + }); + res.json({ success: true, jobId, object: objectName, deleted, restored, failed }); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, ''); + } + }, + metadata: { summary: 'Undo (logically roll back) a finished import job', tags: ['data', 'import'] }, + }); + + // GET /data/import/jobs/:jobId/results — progress + capped per-row report. + this.routeManager.register({ + method: 'GET', + path: `${dataPath}/import/jobs/:jobId/results`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const jobId = String(req.params.jobId || ''); + const row = await loadImportJob(p, jobId, environmentId, context); + if (!row) { + res.status(404).json({ code: 'NOT_FOUND', error: `No import job ${jobId}` }); + return; + } + const stored = row.results; + const items = Array.isArray(stored?.items) ? stored.items : Array.isArray(stored) ? stored : []; + res.json({ ...importJobToProgress(row), results: items, resultsTruncated: !!stored?.truncated }); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, ''); + } + }, + metadata: { summary: 'Import job results (capped per-row report)', tags: ['data', 'import'] }, + }); + + // GET /data/import/jobs/:jobId — live progress counters. + this.routeManager.register({ + method: 'GET', + path: `${dataPath}/import/jobs/:jobId`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const jobId = String(req.params.jobId || ''); + const row = await loadImportJob(p, jobId, environmentId, context); + if (!row) { + res.status(404).json({ code: 'NOT_FOUND', error: `No import job ${jobId}` }); + return; + } + res.json(importJobToProgress(row)); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, ''); + } + }, + metadata: { summary: 'Import job progress', tags: ['data', 'import'] }, + }); + + // GET /data/import/jobs — history list (newest first). + this.routeManager.register({ + method: 'GET', + path: `${dataPath}/import/jobs`, + handler: async (req: any, res: any) => { + try { + const environmentId = isScoped ? req.params?.environmentId : undefined; + const p = await this.resolveProtocol(environmentId, req); + const context = await this.resolveExecCtx(environmentId, req); + if (this.enforceAuth(req, res, context)) return; + const q = req.query ?? {}; + const filter: Record = {}; + if (typeof q.object === 'string' && q.object) filter.object_name = q.object; + if (typeof q.status === 'string' && q.status) filter.status = q.status; + const limit = Math.min(200, Math.max(1, Number(q.limit) || 50)); + const offset = Math.max(0, Number(q.offset) || 0); + const r = await (p as any).findData({ + object: IMPORT_JOB_OBJECT, + query: { $filter: filter, $orderby: { created_at: 'desc' }, $top: limit, $skip: offset }, + ...(environmentId ? { environmentId } : {}), + ...(context ? { context } : {}), + }); + const rows = Array.isArray(r?.records) ? r.records + : Array.isArray(r?.data) ? r.data + : Array.isArray(r?.rows) ? r.rows + : Array.isArray(r) ? r : []; + res.json({ jobs: rows.map(importJobToSummary) }); + } catch (error: any) { + logError('[REST] Unhandled error:', error); + sendError(res, error, ''); + } + }, + metadata: { summary: 'List import jobs (history)', tags: ['data', 'import'] }, + }); + // GET /data/:object/export — streaming export (M10.21 / C.21) // // Query params: diff --git a/packages/spec/api-surface.json b/packages/spec/api-surface.json index a56b445e67..25bf545034 100644 --- a/packages/spec/api-surface.json +++ b/packages/spec/api-surface.json @@ -2217,6 +2217,10 @@ "CreateFlowRequestSchema (const)", "CreateFlowResponse (type)", "CreateFlowResponseSchema (const)", + "CreateImportJobRequest (type)", + "CreateImportJobRequestSchema (const)", + "CreateImportJobResponse (type)", + "CreateImportJobResponseSchema (const)", "CreateManyDataRequest (type)", "CreateManyDataRequestSchema (const)", "CreateManyDataResponse (type)", @@ -2546,13 +2550,31 @@ "HttpMethod (type)", "HttpStatusCode (type)", "I18nProtocol (interface)", + "IMPORT_JOB_MAX_ROWS (const)", "IdRequest (type)", "IdRequestSchema (const)", + "ImportJobApiContracts (const)", + "ImportJobProgress (type)", + "ImportJobProgressSchema (const)", + "ImportJobResults (type)", + "ImportJobResultsSchema (const)", + "ImportJobStatus (type)", + "ImportJobSummary (type)", + "ImportJobSummarySchema (const)", + "ImportMapping (type)", + "ImportMappingSchema (const)", + "ImportRequest (type)", + "ImportRequestSchema (const)", + "ImportResponse (type)", + "ImportResponseSchema (const)", + "ImportRowResult (type)", + "ImportRowResultSchema (const)", "ImportValidationConfig (type)", "ImportValidationConfigSchema (const)", "ImportValidationMode (type)", "ImportValidationResult (type)", "ImportValidationResultSchema (const)", + "ImportWriteMode (type)", "InitiateChunkedUploadRequest (type)", "InitiateChunkedUploadRequestSchema (const)", "InitiateChunkedUploadResponse (type)", @@ -2570,6 +2592,10 @@ "ListFlowsRequestSchema (const)", "ListFlowsResponse (type)", "ListFlowsResponseSchema (const)", + "ListImportJobsRequest (type)", + "ListImportJobsRequestSchema (const)", + "ListImportJobsResponse (type)", + "ListImportJobsResponseSchema (const)", "ListInstalledPackagesRequest (type)", "ListInstalledPackagesRequestSchema (const)", "ListInstalledPackagesResponse (type)", @@ -2893,6 +2919,8 @@ "TriggerFlowRequestSchema (const)", "TriggerFlowResponse (type)", "TriggerFlowResponseSchema (const)", + "UndoImportJobResponse (type)", + "UndoImportJobResponseSchema (const)", "UninstallPackageApiRequest (type)", "UninstallPackageApiRequestSchema (const)", "UninstallPackageApiResponse (type)", diff --git a/packages/spec/src/api/export.zod.ts b/packages/spec/src/api/export.zod.ts index 6d64167fe0..36b3386007 100644 --- a/packages/spec/src/api/export.zod.ts +++ b/packages/spec/src/api/export.zod.ts @@ -258,6 +258,242 @@ export const ExportImportTemplateSchema = lazySchema(() => z.object({ })); export type ExportImportTemplate = z.infer; +// ========================================== +// 4b. Import Request / Result (POST /data/:object/import) +// ========================================== + +/** + * Import Write Mode + * How each incoming row is committed against existing data. + */ +export const ImportWriteMode = z.enum([ + 'insert', // Always create a new record (default; ignores matchFields) + 'update', // Update an existing record matched by matchFields; skip if none + 'upsert', // Update when matched by matchFields, else create +]); +export type ImportWriteMode = z.infer; + +/** + * Field Mapping (import) + * Either a compact `{ sourceColumn: targetField }` record, or the richer + * `FieldMappingEntry[]` form (per-column transform + default + required). + */ +export const ImportMappingSchema = lazySchema(() => z.union([ + z.record(z.string(), z.string()), + z.array(FieldMappingEntrySchema), +])); +export type ImportMapping = z.infer; + +/** + * Import Request Schema + * Body for `POST /api/v1/data/:object/import`. + * + * The server coerces every cell to its storage value using the object's field + * metadata (booleans, numbers, dates→ISO, select label→code, lookup name→id), + * so the client sends raw spreadsheet values plus an optional column mapping. + * + * @example + * { + * format: 'csv', csv: 'Name,Owner,Stage\nAcme,jane@x.com,Won', + * mapping: { Name: 'name', Owner: 'owner', Stage: 'stage' }, + * writeMode: 'upsert', matchFields: ['name'], runAutomations: false, + * } + */ +export const ImportRequestSchema = lazySchema(() => z.object({ + format: z.enum(['csv', 'json', 'xlsx']).optional() + .describe('Payload shape: csv text, a rows[] array, or a base64 xlsx (inferred when omitted)'), + csv: z.string().optional().describe('CSV text (when format = csv)'), + rows: z.array(z.record(z.string(), z.unknown())).optional() + .describe('Row objects (when format = json)'), + xlsxBase64: z.string().optional() + .describe('Base64-encoded .xlsx workbook bytes (when format = xlsx); parsed server-side'), + sheet: z.union([z.string(), z.number().int()]).optional() + .describe('Worksheet name or 1-based index to read (xlsx; defaults to the first sheet)'), + mapping: ImportMappingSchema.optional() + .describe('Source column → target field mapping'), + dryRun: z.boolean().default(false) + .describe('Validate + coerce every row without persisting'), + writeMode: ImportWriteMode.default('insert') + .describe('insert / update / upsert semantics'), + matchFields: z.array(z.string()).optional() + .describe('Fields that identify an existing record (required for update/upsert)'), + runAutomations: z.boolean().default(false) + .describe('Fire triggers/hooks for each imported row (off by default for bulk)'), + trimWhitespace: z.boolean().default(true) + .describe('Trim leading/trailing whitespace from string cells'), + nullValues: z.array(z.string()).optional() + .describe('Strings treated as null/blank (besides empty string)'), + createMissingOptions: z.boolean().default(false) + .describe('Keep unmatched select values instead of failing the row'), + skipBlankMatchKey: z.boolean().default(false) + .describe('Skip rows whose matchFields are blank (default: upsert creates them, update skips them)'), +})); +export type ImportRequest = z.infer; + +/** + * Import Row Result + * Per-row outcome so a UI can render an import report and offer a failed-row + * re-export. + */ +export const ImportRowResultSchema = lazySchema(() => z.object({ + row: z.number().int().describe('1-based row number in the source data'), + ok: z.boolean().describe('Whether the row succeeded'), + action: z.enum(['created', 'updated', 'skipped', 'failed']) + .describe('What happened to the row'), + id: z.string().optional().describe('Record id (created/updated rows)'), + field: z.string().optional().describe('Field that caused a coercion/validation error'), + code: z.string().optional().describe('Error code (failed rows)'), + error: z.string().optional().describe('Human-readable error message (failed rows)'), +})); +export type ImportRowResult = z.infer; + +/** + * Import Response Schema + * Aggregate summary + per-row results returned by the import route. + */ +export const ImportResponseSchema = lazySchema(() => z.object({ + object: z.string().describe('Target object name'), + dryRun: z.boolean().describe('Whether this was a validate-only pass'), + writeMode: ImportWriteMode.describe('Write mode used'), + total: z.number().int().describe('Rows processed'), + ok: z.number().int().describe('Rows that succeeded'), + errors: z.number().int().describe('Rows that failed'), + created: z.number().int().describe('Rows that created a new record'), + updated: z.number().int().describe('Rows that updated an existing record'), + skipped: z.number().int().describe('Rows skipped (no match in update mode, etc.)'), + results: z.array(ImportRowResultSchema).describe('Per-row outcomes'), +})); +export type ImportResponse = z.infer; + +// ========================================== +// 4b. Asynchronous Import Jobs +// ========================================== + +/** + * Hard ceiling on rows accepted by a single async import job. The client sends + * the whole payload in one request (rows[] or a base64 xlsx); this caps memory + * and worker time. Files larger than this must be split client-side. + */ +export const IMPORT_JOB_MAX_ROWS = 50_000; + +/** + * Import Job Status. Mirrors {@link ExportJobStatus} but with the terminal + * states the import worker actually uses (`succeeded` rather than `completed`). + */ +export const ImportJobStatus = z.enum([ + 'pending', // Row persisted, worker not yet started + 'running', // Worker streaming through the batch + 'succeeded', // Finished (rows may still have per-row errors) + 'failed', // Aborted on a fatal error + 'cancelled', // Cancelled by the caller before completion +]); +export type ImportJobStatus = z.infer; + +/** + * Create Import Job Request — body for `POST /api/v1/data/:object/import/jobs`. + * Identical to the synchronous {@link ImportRequestSchema} payload; the only + * difference is the endpoint processes it in the background and streams + * progress instead of blocking until done. + */ +export const CreateImportJobRequestSchema = ImportRequestSchema; +export type CreateImportJobRequest = z.infer; + +/** + * Create Import Job Response — the freshly-created job's id + initial status. + */ +export const CreateImportJobResponseSchema = lazySchema(() => z.object({ + jobId: z.string().describe('Import job id — poll progress/results with this'), + object: z.string().describe('Target object name'), + status: ImportJobStatus.describe('Initial job status (usually "pending")'), + total: z.number().int().describe('Rows accepted for processing'), + createdAt: z.string().describe('Job creation timestamp (ISO 8601)'), +})); +export type CreateImportJobResponse = z.infer; + +/** + * Import Job Progress — the live counters a client polls while the job runs. + */ +export const ImportJobProgressSchema = lazySchema(() => z.object({ + jobId: z.string().describe('Import job id'), + object: z.string().describe('Target object name'), + status: ImportJobStatus.describe('Current job status'), + dryRun: z.boolean().describe('Whether this is a validate-only pass'), + writeMode: ImportWriteMode.describe('Write mode used'), + total: z.number().int().describe('Total rows to process'), + processed: z.number().int().describe('Rows processed so far'), + created: z.number().int().describe('Rows that created a new record'), + updated: z.number().int().describe('Rows that updated an existing record'), + skipped: z.number().int().describe('Rows skipped'), + errors: z.number().int().describe('Rows that failed'), + percentComplete: z.number().min(0).max(100).describe('processed / total as a percentage'), + undoable: z.boolean().describe('Whether this job can still be logically rolled back (undo log captured, terminal state, not yet reverted)'), + revertedAt: z.string().optional().describe('When the job was undone / rolled back (ISO 8601)'), + error: z.string().optional().describe('Fatal error message (when status = failed)'), + startedAt: z.string().optional().describe('Processing start timestamp (ISO 8601)'), + completedAt: z.string().optional().describe('Completion timestamp (ISO 8601)'), + createdAt: z.string().describe('Job creation timestamp (ISO 8601)'), +})); +export type ImportJobProgress = z.infer; + +/** + * Import Job Results — the progress payload plus a capped sample of per-row + * outcomes (failures first) so a UI can render the report / failed-row export. + */ +export const ImportJobResultsSchema = lazySchema(() => ImportJobProgressSchema.extend({ + results: z.array(ImportRowResultSchema).describe('Capped sample of per-row outcomes (failures first)'), + resultsTruncated: z.boolean().describe('Whether `results` is a capped sample of a larger set'), +})); +export type ImportJobResults = z.infer; + +/** + * List Import Jobs Request — query params for the history endpoint. + */ +export const ListImportJobsRequestSchema = lazySchema(() => z.object({ + object: z.string().optional().describe('Filter to one target object'), + status: ImportJobStatus.optional().describe('Filter by job status'), + limit: z.number().int().min(1).max(200).default(50).describe('Max rows to return'), + offset: z.number().int().min(0).default(0).describe('Pagination offset'), +})); +export type ListImportJobsRequest = z.infer; + +/** One row in the import-job history list. */ +export const ImportJobSummarySchema = lazySchema(() => z.object({ + jobId: z.string().describe('Import job id'), + object: z.string().describe('Target object name'), + status: ImportJobStatus.describe('Job status'), + total: z.number().int().describe('Total rows'), + processed: z.number().int().describe('Rows processed'), + created: z.number().int().describe('Rows created'), + updated: z.number().int().describe('Rows updated'), + skipped: z.number().int().describe('Rows skipped'), + errors: z.number().int().describe('Rows failed'), + createdAt: z.string().describe('Job creation timestamp (ISO 8601)'), + completedAt: z.string().optional().describe('Completion timestamp (ISO 8601)'), + undoable: z.boolean().describe('Whether this job can still be logically rolled back'), + revertedAt: z.string().optional().describe('When the job was undone / rolled back (ISO 8601)'), +})); +export type ImportJobSummary = z.infer; + +/** List Import Jobs Response — newest first. */ +export const ListImportJobsResponseSchema = lazySchema(() => z.object({ + jobs: z.array(ImportJobSummarySchema).describe('Import jobs, newest first'), +})); +export type ListImportJobsResponse = z.infer; + +/** + * Undo Import Job Response — the outcome of a logical rollback: created records + * deleted, updated records restored to their pre-import field values. + */ +export const UndoImportJobResponseSchema = lazySchema(() => z.object({ + success: z.boolean().describe('Whether the undo completed'), + jobId: z.string().describe('Import job id'), + object: z.string().describe('Target object name'), + deleted: z.number().int().describe('Created records deleted'), + restored: z.number().int().describe('Updated records restored to pre-import values'), + failed: z.number().int().describe('Reversal operations that failed'), +})); +export type UndoImportJobResponse = z.infer; + // ========================================== // 5. Scheduled Export Jobs // ========================================== @@ -484,3 +720,51 @@ export const ExportApiContracts = { output: BaseResponseSchema, }, }; + +// ========================================== +// 10. Import API Contracts (async jobs) +// ========================================== + +/** + * Import Job API Contract Registry — the async counterpart to the synchronous + * `POST /api/v1/data/:object/import`. The wizard submits a large payload once, + * then polls progress/results and lists history. + */ +export const ImportJobApiContracts = { + createImportJob: { + method: 'POST' as const, + path: '/api/v1/data/:object/import/jobs', + input: CreateImportJobRequestSchema, + output: CreateImportJobResponseSchema, + }, + getImportJobProgress: { + method: 'GET' as const, + path: '/api/v1/data/import/jobs/:jobId', + input: z.object({ jobId: z.string() }), + output: ImportJobProgressSchema, + }, + getImportJobResults: { + method: 'GET' as const, + path: '/api/v1/data/import/jobs/:jobId/results', + input: z.object({ jobId: z.string() }), + output: ImportJobResultsSchema, + }, + listImportJobs: { + method: 'GET' as const, + path: '/api/v1/data/import/jobs', + input: ListImportJobsRequestSchema, + output: ListImportJobsResponseSchema, + }, + cancelImportJob: { + method: 'POST' as const, + path: '/api/v1/data/import/jobs/:jobId/cancel', + input: z.object({ jobId: z.string() }), + output: BaseResponseSchema, + }, + undoImportJob: { + method: 'POST' as const, + path: '/api/v1/data/import/jobs/:jobId/undo', + input: z.object({ jobId: z.string() }), + output: UndoImportJobResponseSchema, + }, +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 82b0c47166..e659a91ad0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1558,6 +1558,9 @@ importers: '@objectstack/core': specifier: workspace:* version: link:../core + '@objectstack/platform-objects': + specifier: workspace:* + version: link:../platform-objects '@objectstack/service-package': specifier: workspace:* version: link:../services/service-package