-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetDataSource.ts
More file actions
128 lines (117 loc) · 4.62 KB
/
parquetDataSource.ts
File metadata and controls
128 lines (117 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import { parquetReadObjects, parquetSchema } from 'hyparquet'
import { parquetReadAsync } from 'hyparquet/src/read.js'
import { assembleAsync } from 'hyparquet/src/rowgroup.js'
import type { AsyncBuffer, AsyncRowGroup, Compressors, FileMetaData } from 'hyparquet'
import { AsyncDataSource, ScanOptions, asyncRow } from 'squirreling'
import { whereToParquetFilter } from './parquetFilter.js'
import { extractSpatialFilter, rowGroupOverlaps } from './parquetSpatial.js'
/**
* Creates a parquet data source for use with squirreling SQL engine.
*/
export function parquetDataSource(file: AsyncBuffer, metadata: FileMetaData, compressors: Compressors): AsyncDataSource {
const schema = parquetSchema(metadata)
return {
numRows: Number(metadata.num_rows),
columns: schema.children.map(child => child.element.name),
scan({ columns, where, limit, offset, signal }: ScanOptions) {
// Convert WHERE AST to hyparquet filter format
const whereFilter = where && whereToParquetFilter(where)
const filter = where ? whereFilter : undefined
const appliedWhere = Boolean(filter && whereFilter)
const appliedLimitOffset = !where || appliedWhere
// Extract spatial filter for row group pruning
const spatialFilter = extractSpatialFilter(where)
// Ensure columns exist in metadata if provided
if (columns) {
for (const col of columns) {
if (!schema.children.some(child => child.element.name === col)) {
throw new Error(`Column "${col}" not found in parquet schema`)
}
}
}
return {
rows: (async function* () {
// Emit rows by row group
let groupStart = 0
let remainingLimit = limit ?? Infinity
for (const rowGroup of metadata.row_groups) {
if (signal?.aborted) break
const rowCount = Number(rowGroup.num_rows)
// Skip row groups using geospatial statistics
if (spatialFilter && !rowGroupOverlaps(rowGroup, spatialFilter)) {
groupStart += rowCount
continue
}
// Skip row groups by offset if where is fully applied
let safeOffset = 0
let safeLimit = rowCount
if (appliedLimitOffset) {
if (offset !== undefined && groupStart < offset) {
safeOffset = Math.min(rowCount, offset - groupStart)
}
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
if (safeLimit <= 0 && safeOffset < rowCount) break
}
if (safeOffset === rowCount) {
groupStart += rowCount
continue
}
// Read objects from this row group
const data = await parquetReadObjects({
file,
metadata,
rowStart: groupStart + safeOffset,
rowEnd: groupStart + safeOffset + safeLimit,
columns,
filter,
filterStrict: false,
compressors,
useOffsetIndex: true,
})
// Yield each row
for (const row of data) {
yield asyncRow(row, Object.keys(row))
}
remainingLimit -= data.length
groupStart += rowCount
}
})(),
appliedWhere,
appliedLimitOffset,
}
},
async *scanColumn({ column, limit, offset, signal }) {
const rowStart = offset ?? 0
const rowEnd = limit !== undefined ? rowStart + limit : undefined
const asyncGroups = parquetReadAsync({
file,
metadata,
rowStart,
rowEnd,
columns: [column],
compressors,
})
const schemaTree = parquetSchema(metadata)
const assembled = asyncGroups.map((arg: AsyncRowGroup) => assembleAsync(arg, schemaTree))
for (const rg of assembled) {
if (signal?.aborted) throw new DOMException('Aborted', 'AbortError')
const [firstCol] = rg.asyncColumns
if (!firstCol) continue
const { skipped, data } = await firstCol.data
if (signal?.aborted) throw new DOMException('Aborted', 'AbortError')
let dataStart = rg.groupStart + skipped
for (const page of data) {
const pageRows = page.length
const selectStart = Math.max(rowStart - dataStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - dataStart, pageRows)
if (selectEnd > selectStart) {
yield selectStart > 0 || selectEnd < pageRows
? page.slice(selectStart, selectEnd)
: page
}
dataStart += pageRows
}
}
},
}
}