-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwasm.ts
More file actions
145 lines (123 loc) · 5.29 KB
/
wasm.ts
File metadata and controls
145 lines (123 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import { builder } from './src/build'
import { delta_scan, parquet_scan, read_csv, read_json, read_json_objects, read_parquet, read_text, read_xlsx } from './io'
export { delta_scan, parquet_scan, read_csv, read_json, read_json_objects, read_parquet, read_text, read_xlsx }
// @ts-ignore
import type * as DuckdbTyped from '@duckdb/duckdb-wasm/dist/types/src/index'
// @ts-ignore
import * as _Duckdb from '@duckdb/duckdb-wasm'
import { BuckDBBase } from './core'
import { isBucket } from './src/utils'
const Duckdb = _Duckdb as typeof DuckdbTyped
class BuckDBWasm extends BuckDBBase {
readonly type = 'wasm' as const
_db: DuckdbTyped.AsyncDuckDB = null
_con: DuckdbTyped.AsyncDuckDBConnection | null = null
_initPromise: Promise<void> | null = null
private _initDB(): Promise<void> {
// If initialization is already in progress or done, return the existing promise/resolved promise
if (this._initPromise) {
return this._initPromise
}
if (this._db && this._con) {
return Promise.resolve()
}
// Start initialization
this._initPromise = (async () => {
const JSDELIVR_BUNDLES = Duckdb.getJsDelivrBundles()
const bundle = await Duckdb.selectBundle(JSDELIVR_BUNDLES)
const worker_url = URL.createObjectURL(
new Blob([`importScripts("${bundle.mainWorker}");`], { type: 'text/javascript' }),
)
const worker = new Worker(worker_url)
const logger = new Duckdb.VoidLogger()
const db = new Duckdb.AsyncDuckDB(logger, worker)
await db.instantiate(bundle.mainModule, bundle.pthreadWorker)
URL.revokeObjectURL(worker_url)
const path = isBucket(this.handle) ? ':memory:' : (this.handle || ':memory:')
await db.open({
path,
useDirectIO: true,
// accessMode: Duckdb.DuckDBAccessMode.READ_ONLY,
filesystem: {
allowFullHTTPReads: true,
},
query: { castTimestampToDate: true, castBigIntToDouble: true },
})
this._db = db
this._con = await this._db.connect()
// @ts-ignore
window.db = this._con
// Don't clear the promise here, let it stay resolved
})()
return this._initPromise
}
async ensureSchema(_uri: string) {
// todo
}
private async _executeQueuedCommands(): Promise<void> {
if (!this._con) throw new Error('Database connection not initialized.')
const cmds = this.queue.flush()
if (cmds.length > 0) {
for await (const cmd of cmds) {
// Use query for setup commands like attach, extensions, settings
// send might be slightly more appropriate for non-select, but query works
await this._con.query(cmd)
}
}
}
async query<T = Record<string, any>>(sql: string, opts: { rows?: boolean; withSchema?: boolean } = {}): Promise<T[]> {
await this._initDB()
await this._executeQueuedCommands() // Ensure setup commands run first
if (!this._con) throw new Error('Database connection not initialized.') // Should be initialized now
const reader = await this._con.query(sql)
let rtn = reader.toArray().map(e => e.toJSON() as T)
if (opts?.rows) {
rtn = rtn.map(row => Object.values(row)) as T[]
}
if (opts?.withSchema && !sql.trim().toUpperCase().startsWith('COPY')) {
const schemaReader = await this._con.query('DESCRIBE ' + sql)
const schema = schemaReader.toArray().map(e => e.toJSON())
console.log({ schema })
Object.defineProperty(rtn, 'schema', { value: schema, enumerable: false })
}
return rtn
}
async *stream(sql: string): AsyncGenerator<any, void, unknown> {
await this._initDB()
await this._executeQueuedCommands() // Ensure setup commands run first
if (!this._con) throw new Error('Database connection not initialized.') // Should be initialized now
const reader = await this._con.query(sql)
for (const row of reader) {
yield row.toJSON()
}
}
async run(sql: string): Promise<void> {
await this._initDB()
await this._executeQueuedCommands() // Ensure setup commands run first
if (!this._con) throw new Error('Database connection not initialized.') // Should be initialized now
await this._con.send(sql)
}
async close(): Promise<void> {
// Only close if initialized
if (this._initPromise) {
await this._initPromise // Ensure init finishes before closing
}
if (this._con) {
await this._con.close()
this._con = null
}
if (this._db) {
await this._db.terminate()
this._db = null
}
this._initPromise = null // Reset init state
}
}
// Create a single instance for export
// Pass the instance to the builder
export const Buck = builder(BuckDBWasm)
// Maintain existing export pattern
export const MemoryDB = Buck('')
export const from = MemoryDB.from
// Optional: Export the instance or class
// export { BuckDBWasm, buckDBWasmInstance };