-
Notifications
You must be signed in to change notification settings - Fork 82
Expand file tree
/
Copy pathindex.ts
More file actions
340 lines (300 loc) · 14.5 KB
/
index.ts
File metadata and controls
340 lines (300 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import { DurableObject } from "cloudflare:workers";
import { createResponse, createResponseFromOperationResponse, QueryRequest, QueryTransactionRequest } from './utils';
import { enqueueOperation, OperationQueueItem, processNextOperation } from './operation';
import { LiteREST } from './literest';
import handleStudioRequest from "./studio";
import { dumpDatabaseRoute } from './export/dump';
import { exportTableToJsonRoute } from './export/json';
import { exportTableToCsvRoute } from './export/csv';
import { importDumpRoute } from './import/dump';
import { importTableFromJsonRoute } from './import/json';
import { importTableFromCsvRoute } from './import/csv';
import { handleApiRequest } from "./api";
const DURABLE_OBJECT_ID = 'sql-durable-object';
export interface Env {
AUTHORIZATION_TOKEN: string;
DATABASE_DURABLE_OBJECT: DurableObjectNamespace;
REGION: string;
JURISDICTION?: Jurisdiction;
STUDIO_USER?: string;
STUDIO_PASS?: string;
// ## DO NOT REMOVE: TEMPLATE INTERFACE ##
}
enum RegionLocationHint {
AUTO = 'auto',
WNAM = 'wnam', // Western North America
ENAM = 'enam', // Eastern North America
SAM = 'sam', // South America
WEUR = 'weur', // Western Europe
EEUR = 'eeur', // Eastern Europe
APAC = 'apac', // Asia Pacific
OC = 'oc', // Oceania
AFR = 'afr', // Africa
ME = 'me', // Middle East
}
enum Jurisdiction {
EU = 'eu',
FEDRAMP = 'fedramp'
}
export class DatabaseDurableObject extends DurableObject<Env> {
// Durable storage for the SQL database
public sql: SqlStorage;
// Queue of operations to be processed, with each operation containing a list of queries to be executed
private operationQueue: Array<OperationQueueItem> = [];
// Flag to indicate if an operation is currently being processed
private processingOperation: { value: boolean } = { value: false };
// Map of WebSocket connections to their corresponding session IDs
private connections = new Map<string, WebSocket>();
// Instantiate LiteREST
private liteREST: LiteREST;
/**
* The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
* `DurableObjectStub::get` for a given identifier (no-op constructors can be omitted)
*
* @param ctx - The interface for interacting with Durable Object state
* @param env - The interface to reference bindings declared in wrangler.toml
*/
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
this.sql = ctx.storage.sql;
// Initialize LiteREST for handling /lite routes
this.liteREST = new LiteREST(
ctx,
this.operationQueue,
this.processingOperation,
this.sql
);
}
/**
* Execute a raw SQL query on the database, typically used for external requests
* from other service bindings (e.g. auth). This serves as an exposed function for
* other service bindings to query the database without having to have knowledge of
* the current operation queue or processing state.
*
* @param sql - The SQL query to execute.
* @param params - Optional parameters for the SQL query.
* @returns A response containing the query result or an error message.
*/
async executeExternalQuery(sql: string, params: any[] | undefined): Promise<any> {
try {
const queries = [{ sql, params }];
const response = await enqueueOperation(
queries,
false,
false,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation, this.env)
);
return response;
} catch (error: any) {
console.error('Execute External Query Error:', error);
return null;
}
}
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
try {
const contentType = request.headers.get('Content-Type') || '';
if (!contentType.includes('application/json')) {
return createResponse(undefined, 'Content-Type must be application/json.', 400);
}
const { sql, params, transaction } = await request.json() as QueryRequest & QueryTransactionRequest;
if (Array.isArray(transaction) && transaction.length) {
const queries = transaction.map((queryObj: any) => {
const { sql, params } = queryObj;
if (typeof sql !== 'string' || !sql.trim()) {
throw new Error('Invalid or empty "sql" field in transaction.');
} else if (params !== undefined && !Array.isArray(params)) {
throw new Error('Invalid "params" field in transaction.');
}
return { sql, params };
});
try {
const response = await enqueueOperation(
queries,
true,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation, this.env)
);
return createResponseFromOperationResponse(response);
} catch (error: any) {
return createResponse(undefined, error.error || 'An unexpected error occurred.', error.status || 500);
}
} else if (typeof sql !== 'string' || !sql.trim()) {
return createResponse(undefined, 'Invalid or empty "sql" field.', 400);
} else if (params !== undefined && !Array.isArray(params)) {
return createResponse(undefined, 'Invalid "params" field.', 400);
}
try {
const queries = [{ sql, params }];
const response = await enqueueOperation(
queries,
false,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation, this.env)
);
return createResponseFromOperationResponse(response);
} catch (error: any) {
return createResponse(undefined, error.error || 'An unexpected error occurred.', error.status || 500);
}
} catch (error: any) {
console.error('Query Route Error:', error);
return createResponse(undefined, error || 'An unexpected error occurred.', 500);
}
}
async statusRoute(_: Request): Promise<Response> {
return createResponse({
status: 'reachable',
timestamp: Date.now(),
usedDisk: await this.sql.databaseSize,
}, undefined, 200)
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (request.method === 'POST' && url.pathname === '/query/raw') {
return this.queryRoute(request, true);
} else if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request, false);
} else if (url.pathname === '/socket') {
return this.clientConnected();
} else if (request.method === 'GET' && url.pathname === '/status') {
return this.statusRoute(request);
} else if (request.method === 'GET' && url.pathname === '/status/trace') {
const response = await fetch('https://cloudflare.com/cdn-cgi/trace');
return new Response(response.body, {
headers: response.headers
});
} else if (url.pathname.startsWith('/rest')) {
return await this.liteREST.handleRequest(request);
} else if (request.method === 'GET' && url.pathname === '/export/dump') {
return dumpDatabaseRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation);
} else if (request.method === 'GET' && url.pathname.startsWith('/export/json/')) {
const tableName = url.pathname.split('/').pop();
if (!tableName) {
return createResponse(undefined, 'Table name is required', 400);
}
return exportTableToJsonRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation, tableName);
} else if (request.method === 'GET' && url.pathname.startsWith('/export/csv/')) {
const tableName = url.pathname.split('/').pop();
if (!tableName) {
return createResponse(undefined, 'Table name is required', 400);
}
return exportTableToCsvRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation, tableName);
} else if (request.method === 'POST' && url.pathname === '/import/dump') {
return importDumpRoute(request, this.sql, this.operationQueue, this.ctx, this.processingOperation);
} else if (request.method === 'POST' && url.pathname.startsWith('/import/json/')) {
const tableName = url.pathname.split('/').pop();
if (!tableName) {
return createResponse(undefined, 'Table name is required', 400);
}
return importTableFromJsonRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation, tableName, request);
} else if (request.method === 'POST' && url.pathname.startsWith('/import/csv/')) {
const tableName = url.pathname.split('/').pop();
if (!tableName) {
return createResponse(undefined, 'Table name is required', 400);
}
return importTableFromCsvRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation, tableName, request);
} else if (url.pathname.startsWith('/api')) {
return await handleApiRequest(request);
} else {
return createResponse(undefined, 'Unknown operation', 400);
}
}
clientConnected() {
const webSocketPair = new WebSocketPair();
const [client, server] = Object.values(webSocketPair);
const wsSessionId = crypto.randomUUID();
this.ctx.acceptWebSocket(server, [wsSessionId]);
this.connections.set(wsSessionId, client);
return new Response(null, { status: 101, webSocket: client });
}
async webSocketMessage(ws: WebSocket, message: any) {
const { sql, params, action } = JSON.parse(message);
if (action === 'query') {
const queries = [{ sql, params }];
const response = await enqueueOperation(
queries,
false,
false,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation, this.env)
);
ws.send(JSON.stringify(response.result));
}
}
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean) {
// If the client closes the connection, the runtime will invoke the webSocketClose() handler.
ws.close(code, "StarbaseDB is closing WebSocket connection");
// Remove the WebSocket connection from the map
const tags = this.ctx.getTags(ws);
if (tags.length) {
const wsSessionId = tags[0];
this.connections.delete(wsSessionId);
}
}
}
export default {
/**
* This is the standard fetch handler for a Cloudflare Worker
*
* @param request - The request submitted to the Worker from the client
* @param env - The interface to reference bindings declared in wrangler.toml
* @param ctx - The execution context of the Worker
* @returns The response to be sent back to the client
*/
async fetch(request, env, ctx): Promise<Response> {
const pathname = new URL(request.url).pathname;
const isWebSocket = request.headers.get("Upgrade") === "websocket";
/**
* If the request is a GET request to the /studio endpoint, we can handle the request
* directly in the Worker to avoid the need to deploy a separate Worker for the Studio.
* Studio provides a user interface to interact with the SQLite database in the Durable
* Object.
*/
if (env.STUDIO_USER && env.STUDIO_PASS && request.method === 'GET' && pathname === '/studio') {
return handleStudioRequest(request, {
username: env.STUDIO_USER,
password: env.STUDIO_PASS,
apiToken: env.AUTHORIZATION_TOKEN
});
}
/**
* Prior to proceeding to the Durable Object, we can perform any necessary validation or
* authorization checks here to ensure the request signature is valid and authorized to
* interact with the Durable Object.
*/
if (request.headers.get('Authorization') !== `Bearer ${env.AUTHORIZATION_TOKEN}` && !isWebSocket) {
return createResponse(undefined, 'Unauthorized request', 401)
} else if (isWebSocket) {
/**
* Web socket connections cannot pass in an Authorization header into their requests,
* so we can use a query parameter to validate the connection.
*/
const url = new URL(request.url);
const token = url.searchParams.get('token');
if (token !== env.AUTHORIZATION_TOKEN) {
return new Response('WebSocket connections are not supported at this endpoint.', { status: 440 });
}
}
/**
* Handle Durable Object creation with jurisdiction or region preferences
*/
let id: DurableObjectId;
let stub: DurableObjectStub;
if (env.JURISDICTION) {
// If jurisdiction is specified, it takes precedence over region
const namespace = env.DATABASE_DURABLE_OBJECT.jurisdiction(env.JURISDICTION as Jurisdiction);
id = namespace.idFromName(DURABLE_OBJECT_ID);
stub = namespace.get(id);
} else {
// Fall back to region-based routing if no jurisdiction is specified
id = env.DATABASE_DURABLE_OBJECT.idFromName(DURABLE_OBJECT_ID);
const region = env.REGION ?? RegionLocationHint.AUTO;
stub = region !== RegionLocationHint.AUTO
? env.DATABASE_DURABLE_OBJECT.get(id, { locationHint: region as DurableObjectLocationHint })
: env.DATABASE_DURABLE_OBJECT.get(id);
}
return await stub.fetch(request);
},
} satisfies ExportedHandler<Env>;