Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions packages/client/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1051,3 +1051,62 @@ describe('ObjectStackClient locale → Accept-Language', () => {
expect(lastHeaders(fetchMock)['Accept-Language']).toBeUndefined();
});
});

describe('Import-job namespace', () => {
it('createImportJob POSTs the payload to /data/:object/import/jobs', async () => {
const { client, fetchMock } = createMockClient({ jobId: 'imp_x', object: 'task', status: 'pending', total: 3, createdAt: '2026-07-01T00:00:00Z' });
const res = await client.data.createImportJob('task', { format: 'json', rows: [{ id: 'a' }] } as any);
const [url, init] = fetchMock.mock.calls[0];
expect(url).toBe('http://localhost:3000/api/v1/data/task/import/jobs');
expect(init.method).toBe('POST');
expect(res).toMatchObject({ jobId: 'imp_x', status: 'pending', total: 3 });
});

it('getImportJobProgress GETs /data/import/jobs/:jobId', async () => {
const { client, fetchMock } = createMockClient({ jobId: 'imp_x', object: 'task', status: 'running', percentComplete: 40 });
const res = await client.data.getImportJobProgress('imp_x');
expect(fetchMock.mock.calls[0][0]).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x');
expect(res.percentComplete).toBe(40);
});

it('getImportJobResults GETs the /results sub-route', async () => {
const { client, fetchMock } = createMockClient({ jobId: 'imp_x', status: 'succeeded', results: [{ row: 1, ok: true, action: 'created' }], resultsTruncated: false });
const res = await client.data.getImportJobResults('imp_x');
expect(fetchMock.mock.calls[0][0]).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/results');
expect(res.results).toHaveLength(1);
expect(res.resultsTruncated).toBe(false);
});

it('listImportJobs builds the query string and unwraps the jobs array', async () => {
const { client, fetchMock } = createMockClient({ jobs: [{ jobId: 'imp_x', object: 'task', status: 'succeeded' }] });
const jobs = await client.data.listImportJobs({ object: 'task', status: 'succeeded', limit: 10, offset: 5 });
const url = fetchMock.mock.calls[0][0] as string;
expect(url.startsWith('http://localhost:3000/api/v1/data/import/jobs?')).toBe(true);
expect(url).toContain('object=task');
expect(url).toContain('status=succeeded');
expect(url).toContain('limit=10');
expect(url).toContain('offset=5');
expect(jobs).toHaveLength(1);
expect(jobs[0].jobId).toBe('imp_x');
});

it('cancelImportJob POSTs the /cancel sub-route', async () => {
const { client, fetchMock } = createMockClient({ success: true });
const res = await client.data.cancelImportJob('imp_x');
const [url, init] = fetchMock.mock.calls[0];
expect(url).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/cancel');
expect(init.method).toBe('POST');
expect(res.success).toBe(true);
});

it('undoImportJob POSTs the /undo sub-route', async () => {
const { client, fetchMock } = createMockClient({ success: true, jobId: 'imp_x', object: 'task', deleted: 3, restored: 2, failed: 0 });
const res = await client.data.undoImportJob('imp_x');
const [url, init] = fetchMock.mock.calls[0];
expect(url).toBe('http://localhost:3000/api/v1/data/import/jobs/imp_x/undo');
expect(init.method).toBe('POST');
expect(res.success).toBe(true);
expect(res.deleted).toBe(3);
expect(res.restored).toBe(2);
});
});
165 changes: 165 additions & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ import {
UnsubscribeResponse,
WellKnownCapabilities,
ApiRoutes,
ImportRequest,
ImportResponse,
CreateImportJobRequest,
CreateImportJobResponse,
ImportJobProgress,
ImportJobResults,
ImportJobSummary,
ListImportJobsRequest,
ListImportJobsResponse,
UndoImportJobResponse,
} from '@objectstack/spec/api';
import type {
ApprovalRequestRow,
Expand Down Expand Up @@ -3046,6 +3056,95 @@ export class ObjectStackClient {
return this.unwrapResponse<T[]>(res);
},

/**
* Bulk-import rows (CSV text or JSON row objects) into an object.
*
* The server coerces each cell to its storage value using the object's field
* metadata (booleans, numbers, dates→ISO, select label→code, lookup name→id),
* so callers send raw spreadsheet values plus an optional column `mapping`.
* `writeMode` selects insert / update / upsert (the latter two need
* `matchFields`); `dryRun` validates + previews without persisting. The
* response carries per-row outcomes for an import report.
*/
import: async (object: string, request: ImportRequest): Promise<ImportResponse> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/${object}/import`, {
method: 'POST',
body: JSON.stringify(request),
});
return this.unwrapResponse<ImportResponse>(res);
},

/**
* Import-job namespace — the asynchronous counterpart to {@link import} for
* large files (up to 50,000 rows). `createImportJob` posts the whole payload
* once and returns immediately with a `jobId`; a server worker processes the
* batch in the background. Poll {@link getImportJobProgress} for live
* counters, {@link getImportJobResults} for the capped per-row report, and
* {@link listImportJobs} for history. {@link cancelImportJob} stops a
* pending/running job cooperatively.
*
* These routes require a server new enough to expose them — older servers
* return 404, which surfaces here as a rejected promise. Callers that want
* graceful degradation should feature-detect (e.g. try the job, fall back
* to the synchronous {@link import} on 404).
*/
createImportJob: async (object: string, request: CreateImportJobRequest): Promise<CreateImportJobResponse> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/${object}/import/jobs`, {
method: 'POST',
body: JSON.stringify(request),
});
return this.unwrapResponse<CreateImportJobResponse>(res);
},

getImportJobProgress: async (jobId: string): Promise<ImportJobProgress> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}`);
return this.unwrapResponse<ImportJobProgress>(res);
},

getImportJobResults: async (jobId: string): Promise<ImportJobResults> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/results`);
return this.unwrapResponse<ImportJobResults>(res);
},

listImportJobs: async (query: Partial<ListImportJobsRequest> = {}): Promise<ImportJobSummary[]> => {
const route = this.getRoute('data');
const qs = new URLSearchParams();
if (query.object) qs.set('object', query.object);
if (query.status) qs.set('status', query.status);
if (query.limit != null) qs.set('limit', String(query.limit));
if (query.offset != null) qs.set('offset', String(query.offset));
const suffix = qs.toString() ? `?${qs.toString()}` : '';
const res = await this.fetch(`${this.baseUrl}${route}/import/jobs${suffix}`);
const body = await this.unwrapResponse<ListImportJobsResponse>(res);
return body.jobs;
},

cancelImportJob: async (jobId: string): Promise<{ success: boolean }> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/cancel`, {
method: 'POST',
});
return this.unwrapResponse<{ success: boolean }>(res);
},

/**
* Logically roll back a finished import: delete the records it created and
* restore the fields it updated to their pre-import values. Only jobs that
* captured an undo log (small, non-dry-run, not yet reverted) are undoable —
* others return 422. See {@link ImportJobProgress.undoable}.
*/
undoImportJob: async (jobId: string): Promise<UndoImportJobResponse> => {
const route = this.getRoute('data');
const res = await this.fetch(`${this.baseUrl}${route}/import/jobs/${encodeURIComponent(jobId)}/undo`, {
method: 'POST',
});
return this.unwrapResponse<UndoImportJobResponse>(res);
},

update: async <T = any>(
object: string,
id: string,
Expand Down Expand Up @@ -3456,6 +3555,64 @@ export class ScopedProjectClient {
});
return this.parent._unwrap<T[]>(res);
},
/**
* Bulk-import rows (CSV text or JSON row objects) into an object. The server
* coerces each cell to its storage value from field metadata (booleans,
* numbers, dates→ISO, select label→code, lookup name→id); callers send raw
* values plus an optional column `mapping`. `writeMode` selects
* insert/update/upsert (update/upsert need `matchFields`); `dryRun`
* validates + previews without persisting.
*/
import: async (object: string, request: ImportRequest): Promise<ImportResponse> => {
const res = await this.parent._fetch(this.url(`/data/${object}/import`), {
method: 'POST',
body: JSON.stringify(request),
});
return this.parent._unwrap<ImportResponse>(res);
},
/**
* Asynchronous import jobs (scoped) — see the top-level `data.createImportJob`
* for semantics. Large payloads are posted once; a server worker processes
* them in the background while callers poll progress / results / history.
*/
createImportJob: async (object: string, request: CreateImportJobRequest): Promise<CreateImportJobResponse> => {
const res = await this.parent._fetch(this.url(`/data/${object}/import/jobs`), {
method: 'POST',
body: JSON.stringify(request),
});
return this.parent._unwrap<CreateImportJobResponse>(res);
},
getImportJobProgress: async (jobId: string): Promise<ImportJobProgress> => {
const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}`));
return this.parent._unwrap<ImportJobProgress>(res);
},
getImportJobResults: async (jobId: string): Promise<ImportJobResults> => {
const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/results`));
return this.parent._unwrap<ImportJobResults>(res);
},
listImportJobs: async (query: Partial<ListImportJobsRequest> = {}): Promise<ImportJobSummary[]> => {
const qs = new URLSearchParams();
if (query.object) qs.set('object', query.object);
if (query.status) qs.set('status', query.status);
if (query.limit != null) qs.set('limit', String(query.limit));
if (query.offset != null) qs.set('offset', String(query.offset));
const suffix = qs.toString() ? `?${qs.toString()}` : '';
const res = await this.parent._fetch(this.url(`/data/import/jobs${suffix}`));
const body = await this.parent._unwrap<ListImportJobsResponse>(res);
return body.jobs;
},
cancelImportJob: async (jobId: string): Promise<{ success: boolean }> => {
const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/cancel`), {
method: 'POST',
});
return this.parent._unwrap<{ success: boolean }>(res);
},
undoImportJob: async (jobId: string): Promise<UndoImportJobResponse> => {
const res = await this.parent._fetch(this.url(`/data/import/jobs/${encodeURIComponent(jobId)}/undo`), {
method: 'POST',
});
return this.parent._unwrap<UndoImportJobResponse>(res);
},
update: async <T = any>(object: string, id: string, data: Partial<T>): Promise<UpdateDataResult<T>> => {
const res = await this.parent._fetch(this.url(`/data/${object}/${id}`), {
method: 'PATCH',
Expand Down Expand Up @@ -3638,6 +3795,14 @@ export type {
AuthProviderInfo,
EmailPasswordConfigPublic,
AuthFeaturesConfig,
CreateImportJobRequest,
CreateImportJobResponse,
ImportJobProgress,
ImportJobResults,
ImportJobSummary,
ListImportJobsRequest,
ListImportJobsResponse,
UndoImportJobResponse,
} from '@objectstack/spec/api';

// Approval runtime types (ADR-0019) — surfaced so SDK consumers can type the
Expand Down
1 change: 1 addition & 0 deletions packages/platform-objects/src/audit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ export { SysReportSchedule } from './sys-report-schedule.object.js';
export { SysJob } from './sys-job.object.js';
export { SysJobRun } from './sys-job-run.object.js';
export { SysJobQueue } from './sys-job-queue.object.js';
export { SysImportJob } from './sys-import-job.object.js';
109 changes: 109 additions & 0 deletions packages/platform-objects/src/audit/sys-import-job.object.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { ObjectSchema, Field } from '@objectstack/spec/data';

/**
* sys_import_job — Asynchronous Data Import Job
*
* Each row tracks one bulk import submitted through the async import API
* (`POST /data/:object/import/jobs`). The client sends the whole payload
* (rows[] or a base64 xlsx) in one request; the server persists this row,
* processes the batch in the background, and streams progress by updating the
* counters below. Readers poll `progress` / `results` and list history.
*
* Persisting to the DB (rather than in-memory) means progress and history
* survive a server restart and are queryable per object / per user.
*
* Writers: the rest-server import-job worker.
* Readers: the Import Wizard (progress + history), dashboards.
*
* @namespace sys
*/
export const SysImportJob = ObjectSchema.create({
name: 'sys_import_job',
label: 'Import Job',
pluralLabel: 'Import Jobs',
icon: 'upload',
isSystem: true,
managedBy: 'system',
description: 'Asynchronous bulk-import job state, progress, and history',
displayNameField: 'object_name',
nameField: 'object_name', // [ADR-0079] canonical primary-title pointer
titleFormat: '{object_name} import @ {created_at}',
compactLayout: ['object_name', 'status', 'processed_rows', 'total_rows', 'created_at'],

fields: {
id: Field.text({ label: 'Job ID', required: true, readonly: true, group: 'System' }),

object_name: Field.text({
label: 'Object',
required: true,
maxLength: 255,
searchable: true,
description: 'API name of the object being imported into',
group: 'Identity',
}),

status: Field.select(
['pending', 'running', 'succeeded', 'failed', 'cancelled'],
{ label: 'Status', required: true, defaultValue: 'pending', group: 'State' },
),

// ── progress counters (updated as the worker streams through the batch) ──
total_rows: Field.number({ label: 'Total Rows', required: true, defaultValue: 0, group: 'Progress' }),
processed_rows: Field.number({ label: 'Processed Rows', required: true, defaultValue: 0, group: 'Progress' }),
created_count: Field.number({ label: 'Created', required: false, defaultValue: 0, group: 'Progress' }),
updated_count: Field.number({ label: 'Updated', required: false, defaultValue: 0, group: 'Progress' }),
skipped_count: Field.number({ label: 'Skipped', required: false, defaultValue: 0, group: 'Progress' }),
error_count: Field.number({ label: 'Errors', required: false, defaultValue: 0, group: 'Progress' }),

// ── request echo (so history is self-describing without the payload) ──
write_mode: Field.select(
['insert', 'update', 'upsert'],
{ label: 'Write Mode', required: false, defaultValue: 'insert', group: 'Request' },
),
dry_run: Field.boolean({ label: 'Dry Run', required: false, defaultValue: false, group: 'Request' }),
run_automations: Field.boolean({ label: 'Run Automations', required: false, defaultValue: false, group: 'Request' }),

// ── outcome ──
error: Field.textarea({ label: 'Fatal Error', required: false, group: 'Outcome' }),
results: Field.json({
label: 'Row Results (sample)',
required: false,
description: 'Capped sample of per-row results (failures first) for the UI',
group: 'Outcome',
}),

// ── undo / logical rollback ──
undo_log: Field.json({
label: 'Undo Log',
required: false,
description: 'Reversal instructions ({created:[ids], updated:[{id,before}]}) captured for small non-dry-run jobs so the import can be undone',
group: 'Outcome',
}),
reverted_at: Field.datetime({
label: 'Reverted At',
required: false,
description: 'Set when the import was undone (created records deleted, updated records restored)',
group: 'Outcome',
}),

// ── lifecycle timestamps ──
started_at: Field.datetime({ label: 'Started At', required: false, group: 'State' }),
completed_at: Field.datetime({ label: 'Completed At', required: false, group: 'State' }),
created_by: Field.text({ label: 'Created By', required: false, readonly: true, group: 'System' }),
created_at: Field.datetime({
label: 'Created At',
required: true,
defaultValue: 'NOW()',
readonly: true,
group: 'System',
}),
},

indexes: [
{ fields: ['object_name', 'created_at'] },
{ fields: ['status', 'created_at'] },
{ fields: ['created_by', 'created_at'] },
],
});
1 change: 1 addition & 0 deletions packages/rest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
},
"dependencies": {
"@objectstack/core": "workspace:*",
"@objectstack/platform-objects": "workspace:*",
"@objectstack/service-package": "workspace:*",
"@objectstack/spec": "workspace:*",
"exceljs": "^4.4.0",
Expand Down
Loading