Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 60 additions & 48 deletions bin/tools/parquetDataSource.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet'
import { parquetReadObjects, parquetSchema } from 'hyparquet'
import { whereToParquetFilter } from './parquetFilter.js'

/**
Expand All @@ -16,61 +16,73 @@ import { whereToParquetFilter } from './parquetFilter.js'
*/
export function parquetDataSource(file, metadata, compressors) {
return {
async *scan({ hints, signal }) {
metadata ??= await parquetMetadataAsync(file)

scan({ columns, where, limit, offset, signal }) {
// Convert WHERE AST to hyparquet filter format
const whereFilter = hints?.where && whereToParquetFilter(hints.where)
const whereFilter = where && whereToParquetFilter(where)
/** @type {ParquetQueryFilter | undefined} */
const filter = hints?.where ? whereFilter : undefined
const filterApplied = !filter || whereFilter

// Emit rows by row group
let groupStart = 0
let remainingLimit = hints?.limit ?? Infinity
for (const rowGroup of metadata.row_groups) {
if (signal?.aborted) break
const rowCount = Number(rowGroup.num_rows)
const filter = where ? whereFilter : undefined
const appliedWhere = Boolean(filter && whereFilter)
const appliedLimitOffset = !where || appliedWhere

// Skip row groups by offset if where is fully applied
let safeOffset = 0
let safeLimit = rowCount
if (filterApplied) {
if (hints?.offset !== undefined && groupStart < hints.offset) {
safeOffset = Math.min(rowCount, hints.offset - groupStart)
// Ensure columns exist in metadata if provided
if (columns) {
const schema = parquetSchema(metadata)
for (const col of columns) {
if (!schema.children.some(child => child.element.name === col)) {
throw new Error(`Column "${col}" not found in parquet schema`)
}
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
if (safeLimit <= 0 && safeOffset < rowCount) break
}
for (let i = 0; i < safeOffset; i++) {
// yield empty rows
yield asyncRow({})
}
if (safeOffset === rowCount) {
groupStart += rowCount
continue
}
}

// Read objects from this row group
const data = await parquetReadObjects({
file,
metadata,
rowStart: groupStart + safeOffset,
rowEnd: groupStart + safeOffset + safeLimit,
columns: hints?.columns,
filter,
filterStrict: false,
compressors,
useOffsetIndex: true,
})
return {
rows: (async function* () {
// Emit rows by row group
let groupStart = 0
let remainingLimit = limit ?? Infinity
for (const rowGroup of metadata.row_groups) {
if (signal?.aborted) break
const rowCount = Number(rowGroup.num_rows)

// Yield each row
for (const row of data) {
yield asyncRow(row)
}
// Skip row groups by offset if where is fully applied
let safeOffset = 0
let safeLimit = rowCount
if (appliedLimitOffset) {
if (offset !== undefined && groupStart < offset) {
safeOffset = Math.min(rowCount, offset - groupStart)
}
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
if (safeLimit <= 0 && safeOffset < rowCount) break
}
if (safeOffset === rowCount) {
groupStart += rowCount
continue
}

// Read objects from this row group
// TODO: move to worker
const data = await parquetReadObjects({
file,
metadata,
rowStart: groupStart + safeOffset,
rowEnd: groupStart + safeOffset + safeLimit,
columns,
filter,
filterStrict: false,
compressors,
useOffsetIndex: true,
})

remainingLimit -= data.length
groupStart += rowCount
// Yield each row
for (const row of data) {
yield asyncRow(row)
}

remainingLimit -= data.length
groupStart += rowCount
}
})(),
appliedWhere,
appliedLimitOffset,
}
},
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"hyparquet": "1.25.0",
"hyparquet-compressors": "1.1.1",
"icebird": "0.3.1",
"squirreling": "0.7.9"
"squirreling": "0.9.1"
},
"devDependencies": {
"@storybook/react-vite": "10.2.10",
Expand Down