diff --git a/packages/server/src/api/rpc/index.ts b/packages/server/src/api/rpc/index.ts index 6f572e82d..7b3cbcb39 100644 --- a/packages/server/src/api/rpc/index.ts +++ b/packages/server/src/api/rpc/index.ts @@ -1,5 +1,5 @@ import { lowerCaseFirst, safeJSONStringify } from '@zenstackhq/common-helpers'; -import { ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm'; +import { CoreCrudOperations, ORMError, ORMErrorReason, type ClientContract } from '@zenstackhq/orm'; import type { SchemaDef } from '@zenstackhq/orm/schema'; import SuperJSON from 'superjson'; import { match } from 'ts-pattern'; @@ -11,6 +11,9 @@ import { loggerSchema } from '../common/schemas'; import { processSuperJsonRequestPayload, unmarshalQ } from '../common/utils'; import { log, registerCustomSerializers } from '../utils'; +const TRANSACTION_ROUTE_PREFIX = '$transaction' as const; +const VALID_OPS = new Set(CoreCrudOperations as unknown as string[]); + registerCustomSerializers(); /** @@ -71,6 +74,15 @@ export class RPCApiHandler implements ApiH }); } + if (model === TRANSACTION_ROUTE_PREFIX) { + return this.handleTransaction({ + client, + method: method.toUpperCase(), + type: op, + requestBody, + }); + } + model = lowerCaseFirst(model); method = method.toUpperCase(); let args: unknown; @@ -185,6 +197,91 @@ export class RPCApiHandler implements ApiH } } + private async handleTransaction({ + client, + method, + type, + requestBody, + }: { + client: ClientContract; + method: string; + type: string; + requestBody?: unknown; + }): Promise { + if (method !== 'POST') { + return this.makeBadInputErrorResponse('invalid request method, only POST is supported'); + } + + if (type !== 'sequential') { + return this.makeBadInputErrorResponse(`unsupported transaction type: ${type}`); + } + + if (!requestBody || !Array.isArray(requestBody) || requestBody.length === 0) { + return this.makeBadInputErrorResponse('request body must be a non-empty array of operations'); + } + + const processedOps: Array<{ model: string; op: string; args: unknown }> = []; + + for (let i = 0; i < requestBody.length; i++) { + const item = requestBody[i]; + if (!item || typeof item !== 'object') { + return this.makeBadInputErrorResponse(`operation at index ${i} must be an object`); + } + const { model: itemModel, op: itemOp, args: itemArgs } = item as any; + if (!itemModel || typeof itemModel !== 'string') { + return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "model" field`); + } + if (!itemOp || typeof itemOp !== 'string') { + return this.makeBadInputErrorResponse(`operation at index ${i} is missing a valid "op" field`); + } + if (!VALID_OPS.has(itemOp)) { + return this.makeBadInputErrorResponse(`operation at index ${i} has invalid op: ${itemOp}`); + } + if (!this.isValidModel(client, lowerCaseFirst(itemModel))) { + return this.makeBadInputErrorResponse(`operation at index ${i} has unknown model: ${itemModel}`); + } + if (itemArgs !== undefined && itemArgs !== null && (typeof itemArgs !== 'object' || Array.isArray(itemArgs))) { + return this.makeBadInputErrorResponse(`operation at index ${i} has invalid "args" field`); + } + + const { result: processedArgs, error: argsError } = await this.processRequestPayload(itemArgs ?? {}); + if (argsError) { + return this.makeBadInputErrorResponse(`operation at index ${i}: ${argsError}`); + } + processedOps.push({ model: lowerCaseFirst(itemModel), op: itemOp, args: processedArgs }); + } + + try { + const promises = processedOps.map(({ model, op, args }) => { + return (client as any)[model][op](args); + }); + + log(this.options.log, 'debug', () => `handling "$transaction" request with ${promises.length} operations`); + + const clientResult = await client.$transaction(promises as any); + + const { json, meta } = SuperJSON.serialize(clientResult); + const responseBody: any = { data: json }; + if (meta) { + responseBody.meta = { serialization: meta }; + } + + const response = { status: 200, body: responseBody }; + log( + this.options.log, + 'debug', + () => `sending response for "$transaction" request: ${safeJSONStringify(response)}`, + ); + return response; + } catch (err) { + log(this.options.log, 'error', `error occurred when handling "$transaction" request`, err); + if (err instanceof ORMError) { + return this.makeORMErrorResponse(err); + } + return this.makeGenericErrorResponse(err); + } + } + private async handleProcedureRequest({ client, method, diff --git a/packages/server/test/api/rpc.test.ts b/packages/server/test/api/rpc.test.ts index 30818ae8a..d98fecaa0 100644 --- a/packages/server/test/api/rpc.test.ts +++ b/packages/server/test/api/rpc.test.ts @@ -763,6 +763,274 @@ procedure echoOverview(o: Overview): Overview expect(r.data.stringList).toEqual(['d', 'e', 'f']); }); + describe('transaction', () => { + it('runs sequential operations atomically', async () => { + const handleRequest = makeHandler(); + + // Clean up + await rawClient.post.deleteMany(); + await rawClient.user.deleteMany(); + + const r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'User', + op: 'create', + args: { data: { id: 'txuser1', email: 'txuser1@abc.com' } }, + }, + { + model: 'Post', + op: 'create', + args: { data: { id: 'txpost1', title: 'Tx Post', authorId: 'txuser1' } }, + }, + { + model: 'Post', + op: 'findMany', + args: { where: { authorId: 'txuser1' } }, + }, + ], + client: rawClient, + }); + + expect(r.status).toBe(200); + expect(Array.isArray(r.data)).toBe(true); + expect(r.data).toHaveLength(3); + expect(r.data[0]).toMatchObject({ id: 'txuser1', email: 'txuser1@abc.com' }); + expect(r.data[1]).toMatchObject({ id: 'txpost1', title: 'Tx Post' }); + expect(r.data[2]).toHaveLength(1); + expect(r.data[2][0]).toMatchObject({ id: 'txpost1' }); + + // Clean up + await rawClient.post.deleteMany(); + await rawClient.user.deleteMany(); + }); + + it('rejects non-POST methods', async () => { + const handleRequest = makeHandler(); + + const r = await handleRequest({ + method: 'get', + path: '/$transaction/sequential', + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/only POST is supported/i); + }); + + it('rejects missing or non-array body', async () => { + const handleRequest = makeHandler(); + + let r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/non-empty array/i); + + r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [], + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/non-empty array/i); + + r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: { model: 'User', op: 'findMany', args: {} }, + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/non-empty array/i); + }); + + it('rejects unknown model in operation', async () => { + const handleRequest = makeHandler(); + + const r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [{ model: 'Ghost', op: 'create', args: { data: {} } }], + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/unknown model/i); + }); + + it('rejects invalid op in operation', async () => { + const handleRequest = makeHandler(); + + const r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [{ model: 'User', op: 'dropTable', args: {} }], + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/invalid op/i); + }); + + it('rejects operation missing model or op field', async () => { + const handleRequest = makeHandler(); + + let r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [{ op: 'create', args: { data: {} } }], + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/"model"/i); + + r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [{ model: 'User', args: { data: {} } }], + client: rawClient, + }); + expect(r.status).toBe(400); + expect(r.error.message).toMatch(/"op"/i); + }); + + it('returns error for invalid args (non-existent field in where clause)', async () => { + const handleRequest = makeHandler(); + + // findMany with a non-existent field in where → ORM validation error + let r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'User', + op: 'findMany', + args: { where: { nonExistentField: 'value' } }, + }, + ], + client: rawClient, + }); + expect(r.status).toBe(422); + expect(r.error.message).toMatch(/validation error/i); + + // findUnique missing required where clause → ORM validation error + r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'Post', + op: 'findUnique', + args: {}, + }, + ], + client: rawClient, + }); + expect(r.status).toBe(422); + expect(r.error.message).toMatch(/validation error/i); + + // create with missing required field → ORM validation error + r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'Post', + op: 'create', + // title is required but omitted + args: { data: {} }, + }, + ], + client: rawClient, + }); + expect(r.status).toBe(422); + expect(r.error.message).toMatch(/validation error/i); + }); + + it('deserializes SuperJSON-encoded args per operation', async () => { + const handleRequest = makeHandler(); + + // Clean up + await rawClient.post.deleteMany(); + await rawClient.user.deleteMany(); + + // Serialize args containing a Date so they need SuperJSON deserialization + const publishedAt = new Date('2025-01-15T00:00:00.000Z'); + const serialized = SuperJSON.serialize({ + data: { id: 'txuser3', email: 'txuser3@abc.com' }, + }); + const serializedPost = SuperJSON.serialize({ + data: { id: 'txpost3', title: 'Dated Post', authorId: 'txuser3', publishedAt }, + }); + + const r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'User', + op: 'create', + args: { ...(serialized.json as any), meta: { serialization: serialized.meta } }, + }, + { + model: 'Post', + op: 'create', + args: { ...(serializedPost.json as any), meta: { serialization: serializedPost.meta } }, + }, + ], + client: rawClient, + }); + + expect(r.status).toBe(200); + expect(r.data).toHaveLength(2); + expect(r.data[0]).toMatchObject({ id: 'txuser3' }); + expect(r.data[1]).toMatchObject({ id: 'txpost3' }); + + // Verify the Date was stored correctly + const post = await (rawClient as any).post.findUnique({ where: { id: 'txpost3' } }); + expect(post?.publishedAt instanceof Date).toBe(true); + expect((post?.publishedAt as Date)?.toISOString()).toBe(publishedAt.toISOString()); + + // Clean up + await rawClient.post.deleteMany(); + await rawClient.user.deleteMany(); + }); + + it('rolls back all operations when one fails', async () => { + const handleRequest = makeHandler(); + + // Ensure no users before + await rawClient.user.deleteMany(); + + const r = await handleRequest({ + method: 'post', + path: '/$transaction/sequential', + requestBody: [ + { + model: 'User', + op: 'create', + args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } }, + }, + // duplicate id will cause a DB error → whole tx rolls back + { + model: 'User', + op: 'create', + args: { data: { id: 'txuser2', email: 'txuser2@abc.com' } }, + }, + ], + client: rawClient, + }); + expect(r.status).toBeGreaterThanOrEqual(400); + + // User should not have been committed + const count = await rawClient.user.count(); + expect(count).toBe(0); + }); + }); + function makeHandler() { const handler = new RPCApiHandler({ schema: client.$schema }); return async (args: any) => {