Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion packages/server/src/api/rpc/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { lowerCaseFirst, safeJSONStringify } from '@zenstackhq/common-helpers';
import { ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm';
import { CoreCrudOperations, ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm';
import type { SchemaDef } from '@zenstackhq/orm/schema';
import SuperJSON from 'superjson';
import { match } from 'ts-pattern';
Expand All @@ -11,6 +11,9 @@ import { loggerSchema } from '../common/schemas';
import { processSuperJsonRequestPayload, unmarshalQ } from '../common/utils';
import { log, registerCustomSerializers } from '../utils';

const TRANSACTION_ROUTE_PREFIX = '$transaction' as const;
const VALID_OPS = new Set(CoreCrudOperations as unknown as string[]);

registerCustomSerializers();

/**
Expand Down Expand Up @@ -71,6 +74,15 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
});
}

if (model === TRANSACTION_ROUTE_PREFIX) {
return this.handleTransaction({
client,
method: method.toUpperCase(),
type: op,
requestBody,
});
}

model = lowerCaseFirst(model);
method = method.toUpperCase();
let args: unknown;
Expand Down Expand Up @@ -185,6 +197,91 @@ export class RPCApiHandler<Schema extends SchemaDef = SchemaDef> implements ApiH
}
}

private async handleTransaction({
client,
method,
type,
requestBody,
}: {
client: ClientContract<Schema>;
method: string;
type: string;
requestBody?: unknown;
}): Promise<Response> {
if (method !== 'POST') {
return this.makeBadInputErrorResponse('invalid request method, only POST is supported');
}

if (type !== 'sequential') {
return this.makeBadInputErrorResponse(`unsupported transaction type: ${type}`);
}

if (!requestBody || !Array.isArray(requestBody) || requestBody.length === 0) {
return this.makeBadInputErrorResponse('request body must be a non-empty array of operations');
}

const processedOps: Array<{ model: string; op: string; args: unknown }> = [];

for (let i = 0; i < requestBody.length; i++) {
const item = requestBody[i];
if (!item || typeof item !== 'object') {
return this.makeBadInputErrorResponse(`operation at index ${i} must be an object`);
}
const { model: itemModel, op: itemOp, args: itemArgs } = item as any;
if (!itemModel || typeof itemModel !== 'string') {
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "model" field`);
}
if (!itemOp || typeof itemOp !== 'string') {
return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "op" field`);
}
if (!VALID_OPS.has(itemOp)) {
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid op: ${itemOp}`);
}
if (!this.isValidModel(client, lowerCaseFirst(itemModel))) {
return this.makeBadInputErrorResponse(`operation at index ${i} has unknown model: ${itemModel}`);
}
if (itemArgs !== undefined && itemArgs !== null && (typeof itemArgs !== 'object' || Array.isArray(itemArgs))) {
return this.makeBadInputErrorResponse(`operation at index ${i} has invalid "args" field`);
}

const { result: processedArgs, error: argsError } = await this.processRequestPayload(itemArgs ?? {});
if (argsError) {
return this.makeBadInputErrorResponse(`operation at index ${i}: ${argsError}`);
}
processedOps.push({ model: lowerCaseFirst(itemModel), op: itemOp, args: processedArgs });
}

try {
const promises = processedOps.map(({ model, op, args }) => {
return (client as any)[model][op](args);
});

log(this.options.log, 'debug', () => `handling "$transaction" request with ${promises.length} operations`);

const clientResult = await client.$transaction(promises as any);

const { json, meta } = SuperJSON.serialize(clientResult);
const responseBody: any = { data: json };
if (meta) {
responseBody.meta = { serialization: meta };
}

const response = { status: 200, body: responseBody };
log(
this.options.log,
'debug',
() => `sending response for "$transaction" request: ${safeJSONStringify(response)}`,
);
return response;
} catch (err) {
log(this.options.log, 'error', `error occurred when handling "$transaction" request`, err);
if (err instanceof ORMError) {
return this.makeORMErrorResponse(err);
}
return this.makeGenericErrorResponse(err);
}
}

private async handleProcedureRequest({
client,
method,
Expand Down
268 changes: 268 additions & 0 deletions packages/server/test/api/rpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,274 @@ procedure echoOverview(o: Overview): Overview
expect(r.data.stringList).toEqual(['d', 'e', 'f']);
});

describe('transaction', () => {
it('runs sequential operations atomically', async () => {
const handleRequest = makeHandler();

// Clean up
await rawClient.post.deleteMany();
await rawClient.user.deleteMany();

const r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'User',
op: 'create',
args: { data: { id: 'txuser1', email: 'txuser1@abc.com' } },
},
{
model: 'Post',
op: 'create',
args: { data: { id: 'txpost1', title: 'Tx Post', authorId: 'txuser1' } },
},
{
model: 'Post',
op: 'findMany',
args: { where: { authorId: 'txuser1' } },
},
],
client: rawClient,
});

expect(r.status).toBe(200);
expect(Array.isArray(r.data)).toBe(true);
expect(r.data).toHaveLength(3);
expect(r.data[0]).toMatchObject({ id: 'txuser1', email: 'txuser1@abc.com' });
expect(r.data[1]).toMatchObject({ id: 'txpost1', title: 'Tx Post' });
expect(r.data[2]).toHaveLength(1);
expect(r.data[2][0]).toMatchObject({ id: 'txpost1' });

// Clean up
await rawClient.post.deleteMany();
await rawClient.user.deleteMany();
});

it('rejects non-POST methods', async () => {
const handleRequest = makeHandler();

const r = await handleRequest({
method: 'get',
path: '/$transaction/sequential',
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/only POST is supported/i);
});

it('rejects missing or non-array body', async () => {
const handleRequest = makeHandler();

let r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/non-empty array/i);

r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [],
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/non-empty array/i);

r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: { model: 'User', op: 'findMany', args: {} },
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/non-empty array/i);
});

it('rejects unknown model in operation', async () => {
const handleRequest = makeHandler();

const r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [{ model: 'Ghost', op: 'create', args: { data: {} } }],
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/unknown model/i);
});

it('rejects invalid op in operation', async () => {
const handleRequest = makeHandler();

const r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [{ model: 'User', op: 'dropTable', args: {} }],
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/invalid op/i);
});

it('rejects operation missing model or op field', async () => {
const handleRequest = makeHandler();

let r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [{ op: 'create', args: { data: {} } }],
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/"model"/i);

r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [{ model: 'User', args: { data: {} } }],
client: rawClient,
});
expect(r.status).toBe(400);
expect(r.error.message).toMatch(/"op"/i);
});

it('returns error for invalid args (non-existent field in where clause)', async () => {
const handleRequest = makeHandler();

// findMany with a non-existent field in where → ORM validation error
let r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'User',
op: 'findMany',
args: { where: { nonExistentField: 'value' } },
},
],
client: rawClient,
});
expect(r.status).toBe(422);
expect(r.error.message).toMatch(/validation error/i);

// findUnique missing required where clause → ORM validation error
r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'Post',
op: 'findUnique',
args: {},
},
],
client: rawClient,
});
expect(r.status).toBe(422);
expect(r.error.message).toMatch(/validation error/i);

// create with missing required field → ORM validation error
r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'Post',
op: 'create',
// title is required but omitted
args: { data: {} },
},
],
client: rawClient,
});
expect(r.status).toBe(422);
expect(r.error.message).toMatch(/validation error/i);
});

it('deserializes SuperJSON-encoded args per operation', async () => {
const handleRequest = makeHandler();

// Clean up
await rawClient.post.deleteMany();
await rawClient.user.deleteMany();

// Serialize args containing a Date so they need SuperJSON deserialization
const publishedAt = new Date('2025-01-15T00:00:00.000Z');
const serialized = SuperJSON.serialize({
data: { id: 'txuser3', email: 'txuser3@abc.com' },
});
const serializedPost = SuperJSON.serialize({
data: { id: 'txpost3', title: 'Dated Post', authorId: 'txuser3', publishedAt },
});

const r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'User',
op: 'create',
args: { ...(serialized.json as any), meta: { serialization: serialized.meta } },
},
{
model: 'Post',
op: 'create',
args: { ...(serializedPost.json as any), meta: { serialization: serializedPost.meta } },
},
],
client: rawClient,
});

expect(r.status).toBe(200);
expect(r.data).toHaveLength(2);
expect(r.data[0]).toMatchObject({ id: 'txuser3' });
expect(r.data[1]).toMatchObject({ id: 'txpost3' });

// Verify the Date was stored correctly
const post = await (rawClient as any).post.findUnique({ where: { id: 'txpost3' } });
expect(post?.publishedAt instanceof Date).toBe(true);
expect((post?.publishedAt as Date)?.toISOString()).toBe(publishedAt.toISOString());

// Clean up
await rawClient.post.deleteMany();
await rawClient.user.deleteMany();
});

it('rolls back all operations when one fails', async () => {
const handleRequest = makeHandler();

// Ensure no users before
await rawClient.user.deleteMany();

const r = await handleRequest({
method: 'post',
path: '/$transaction/sequential',
requestBody: [
{
model: 'User',
op: 'create',
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
},
// duplicate id will cause a DB error → whole tx rolls back
{
model: 'User',
op: 'create',
args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } },
},
],
client: rawClient,
});
expect(r.status).toBeGreaterThanOrEqual(400);

// User should not have been committed
const count = await rawClient.user.count();
expect(count).toBe(0);
});
});

function makeHandler() {
const handler = new RPCApiHandler({ schema: client.$schema });
return async (args: any) => {
Expand Down
Loading