From e477689f1076cf9e61ebc90a0f81596dadf3d473 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Wed, 18 Feb 2026 13:49:55 -0800 Subject: [PATCH] Update squirreling --- bin/tools/parquetDataSource.js | 108 ++++++++++++++++++--------------- package.json | 2 +- 2 files changed, 61 insertions(+), 49 deletions(-) diff --git a/bin/tools/parquetDataSource.js b/bin/tools/parquetDataSource.js index 6ea9ab5..b08f872 100644 --- a/bin/tools/parquetDataSource.js +++ b/bin/tools/parquetDataSource.js @@ -1,4 +1,4 @@ -import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet' +import { parquetReadObjects, parquetSchema } from 'hyparquet' import { whereToParquetFilter } from './parquetFilter.js' /** @@ -16,61 +16,73 @@ import { whereToParquetFilter } from './parquetFilter.js' */ export function parquetDataSource(file, metadata, compressors) { return { - async *scan({ hints, signal }) { - metadata ??= await parquetMetadataAsync(file) - + scan({ columns, where, limit, offset, signal }) { // Convert WHERE AST to hyparquet filter format - const whereFilter = hints?.where && whereToParquetFilter(hints.where) + const whereFilter = where && whereToParquetFilter(where) /** @type {ParquetQueryFilter | undefined} */ - const filter = hints?.where ? whereFilter : undefined - const filterApplied = !filter || whereFilter - - // Emit rows by row group - let groupStart = 0 - let remainingLimit = hints?.limit ?? Infinity - for (const rowGroup of metadata.row_groups) { - if (signal?.aborted) break - const rowCount = Number(rowGroup.num_rows) + const filter = where ? whereFilter : undefined + const appliedWhere = Boolean(filter && whereFilter) + const appliedLimitOffset = !where || appliedWhere - // Skip row groups by offset if where is fully applied - let safeOffset = 0 - let safeLimit = rowCount - if (filterApplied) { - if (hints?.offset !== undefined && groupStart < hints.offset) { - safeOffset = Math.min(rowCount, hints.offset - groupStart) + // Ensure columns exist in metadata if provided + if (columns) { + const schema = parquetSchema(metadata) + for (const col of columns) { + if (!schema.children.some(child => child.element.name === col)) { + throw new Error(`Column "${col}" not found in parquet schema`) } - safeLimit = Math.min(rowCount - safeOffset, remainingLimit) - if (safeLimit <= 0 && safeOffset < rowCount) break - } - for (let i = 0; i < safeOffset; i++) { - // yield empty rows - yield asyncRow({}) - } - if (safeOffset === rowCount) { - groupStart += rowCount - continue } + } - // Read objects from this row group - const data = await parquetReadObjects({ - file, - metadata, - rowStart: groupStart + safeOffset, - rowEnd: groupStart + safeOffset + safeLimit, - columns: hints?.columns, - filter, - filterStrict: false, - compressors, - useOffsetIndex: true, - }) + return { + rows: (async function* () { + // Emit rows by row group + let groupStart = 0 + let remainingLimit = limit ?? Infinity + for (const rowGroup of metadata.row_groups) { + if (signal?.aborted) break + const rowCount = Number(rowGroup.num_rows) - // Yield each row - for (const row of data) { - yield asyncRow(row) - } + // Skip row groups by offset if where is fully applied + let safeOffset = 0 + let safeLimit = rowCount + if (appliedLimitOffset) { + if (offset !== undefined && groupStart < offset) { + safeOffset = Math.min(rowCount, offset - groupStart) + } + safeLimit = Math.min(rowCount - safeOffset, remainingLimit) + if (safeLimit <= 0 && safeOffset < rowCount) break + } + if (safeOffset === rowCount) { + groupStart += rowCount + continue + } + + // Read objects from this row group + // TODO: move to worker + const data = await parquetReadObjects({ + file, + metadata, + rowStart: groupStart + safeOffset, + rowEnd: groupStart + safeOffset + safeLimit, + columns, + filter, + filterStrict: false, + compressors, + useOffsetIndex: true, + }) - remainingLimit -= data.length - groupStart += rowCount + // Yield each row + for (const row of data) { + yield asyncRow(row) + } + + remainingLimit -= data.length + groupStart += rowCount + } + })(), + appliedWhere, + appliedLimitOffset, } }, } diff --git a/package.json b/package.json index 6c09c03..77ee219 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,7 @@ "hyparquet": "1.25.0", "hyparquet-compressors": "1.1.1", "icebird": "0.3.1", - "squirreling": "0.7.9" + "squirreling": "0.9.1" }, "devDependencies": { "@storybook/react-vite": "10.2.10",