Switch /data endpoint to Arrow IPC stream with pg cursor batching

Server streams rows from a pg cursor in 10k-row batches, building Arrow
record batches incrementally and piping them as chunked HTTP response —
Node.js heap stays bounded regardless of dataset size.

Client fetches as arrayBuffer() and loads directly into Perspective worker
(native Arrow path, no JSON deserialization). X-Row-Count header drives
a non-blocking banner for datasets >= 500k rows. validCols now derived
from col_meta rather than from row keys.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-04-27 22:57:47 -04:00
parent 11f5b02fc4
commit a6e6efd36e
4 changed files with 338 additions and 12 deletions

263
package-lock.json generated
View File

@ -8,6 +8,7 @@
"name": "pf_app",
"version": "1.0.0",
"dependencies": {
"apache-arrow": "^21.1.0",
"cors": "^2.8.5",
"dotenv": "^16.0.0",
"express": "^4.18.2",
@ -17,6 +18,36 @@
"nodemon": "^3.0.0"
}
},
"node_modules/@swc/helpers": {
"version": "0.5.21",
"resolved": "https://registry.npmjs.org/@swc/helpers/-/helpers-0.5.21.tgz",
"integrity": "sha512-jI/VAmtdjB/RnI8GTnokyX7Ug8c+g+ffD6QRLa6XQewtnGyukKkKSk3wLTM3b5cjt1jNh9x0jfVlagdN2gDKQg==",
"license": "Apache-2.0",
"dependencies": {
"tslib": "^2.8.0"
}
},
"node_modules/@types/command-line-args": {
"version": "5.2.3",
"resolved": "https://registry.npmjs.org/@types/command-line-args/-/command-line-args-5.2.3.tgz",
"integrity": "sha512-uv0aG6R0Y8WHZLTamZwtfsDLVRnOa+n+n5rEvFWL5Na5gZ8V2Teab/duDPFzIIIhs9qizDpcavCusCLJZu62Kw==",
"license": "MIT"
},
"node_modules/@types/command-line-usage": {
"version": "5.0.4",
"resolved": "https://registry.npmjs.org/@types/command-line-usage/-/command-line-usage-5.0.4.tgz",
"integrity": "sha512-BwR5KP3Es/CSht0xqBcUXS3qCAUVXwpRKsV2+arxeb65atasuXG9LykC9Ab10Cw3s2raH92ZqOeILaQbsB2ACg==",
"license": "MIT"
},
"node_modules/@types/node": {
"version": "24.12.2",
"resolved": "https://registry.npmjs.org/@types/node/-/node-24.12.2.tgz",
"integrity": "sha512-A1sre26ke7HDIuY/M23nd9gfB+nrmhtYyMINbjI1zHJxYteKR6qSMX56FsmjMcDb3SMcjJg5BiRRgOCC/yBD0g==",
"license": "MIT",
"dependencies": {
"undici-types": "~7.16.0"
}
},
"node_modules/accepts": {
"version": "1.3.8",
"resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz",
@ -30,6 +61,21 @@
"node": ">= 0.6"
}
},
"node_modules/ansi-styles": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
"license": "MIT",
"dependencies": {
"color-convert": "^2.0.1"
},
"engines": {
"node": ">=8"
},
"funding": {
"url": "https://github.com/chalk/ansi-styles?sponsor=1"
}
},
"node_modules/anymatch": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.3.tgz",
@ -44,6 +90,35 @@
"node": ">= 8"
}
},
"node_modules/apache-arrow": {
"version": "21.1.0",
"resolved": "https://registry.npmjs.org/apache-arrow/-/apache-arrow-21.1.0.tgz",
"integrity": "sha512-kQrYLxhC+NTVVZ4CCzGF6L/uPVOzJmD1T3XgbiUnP7oTeVFOFgEUu6IKNwCDkpFoBVqDKQivlX4RUFqqnWFlEA==",
"license": "Apache-2.0",
"dependencies": {
"@swc/helpers": "^0.5.11",
"@types/command-line-args": "^5.2.3",
"@types/command-line-usage": "^5.0.4",
"@types/node": "^24.0.3",
"command-line-args": "^6.0.1",
"command-line-usage": "^7.0.1",
"flatbuffers": "^25.1.24",
"json-bignum": "^0.0.3",
"tslib": "^2.6.2"
},
"bin": {
"arrow2csv": "bin/arrow2csv.js"
}
},
"node_modules/array-back": {
"version": "6.2.3",
"resolved": "https://registry.npmjs.org/array-back/-/array-back-6.2.3.tgz",
"integrity": "sha512-SGDvmg6QTYiTxCBkYVmThcoa67uLl35pyzRHdpCGBOcqFy6BtwnphoFPk7LhJshD+Yk1Kt35WGWeZPTgwR4Fhw==",
"license": "MIT",
"engines": {
"node": ">=12.17"
}
},
"node_modules/array-flatten": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz",
@ -161,6 +236,58 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/chalk": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
"license": "MIT",
"dependencies": {
"ansi-styles": "^4.1.0",
"supports-color": "^7.1.0"
},
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/chalk/chalk?sponsor=1"
}
},
"node_modules/chalk-template": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/chalk-template/-/chalk-template-0.4.0.tgz",
"integrity": "sha512-/ghrgmhfY8RaSdeo43hNXxpoHAtxdbskUHjPpfqUWGttFgycUhYPGx3YZBCnUCvOa7Doivn1IZec3DEGFoMgLg==",
"license": "MIT",
"dependencies": {
"chalk": "^4.1.2"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/chalk/chalk-template?sponsor=1"
}
},
"node_modules/chalk/node_modules/has-flag": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
"license": "MIT",
"engines": {
"node": ">=8"
}
},
"node_modules/chalk/node_modules/supports-color": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
"license": "MIT",
"dependencies": {
"has-flag": "^4.0.0"
},
"engines": {
"node": ">=8"
}
},
"node_modules/chokidar": {
"version": "3.6.0",
"resolved": "https://registry.npmjs.org/chokidar/-/chokidar-3.6.0.tgz",
@ -186,6 +313,62 @@
"fsevents": "~2.3.2"
}
},
"node_modules/color-convert": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
"license": "MIT",
"dependencies": {
"color-name": "~1.1.4"
},
"engines": {
"node": ">=7.0.0"
}
},
"node_modules/color-name": {
"version": "1.1.4",
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
"license": "MIT"
},
"node_modules/command-line-args": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/command-line-args/-/command-line-args-6.0.2.tgz",
"integrity": "sha512-AIjYVxrV9X752LmPDLbVYv8aMCuHPSLZJXEo2qo/xJfv+NYhaZ4sMSF01rM+gHPaMgvPM0l5D/F+Qx+i2WfSmQ==",
"license": "MIT",
"dependencies": {
"array-back": "^6.2.3",
"find-replace": "^5.0.2",
"lodash.camelcase": "^4.3.0",
"typical": "^7.3.0"
},
"engines": {
"node": ">=12.20"
},
"peerDependencies": {
"@75lb/nature": "latest"
},
"peerDependenciesMeta": {
"@75lb/nature": {
"optional": true
}
}
},
"node_modules/command-line-usage": {
"version": "7.0.4",
"resolved": "https://registry.npmjs.org/command-line-usage/-/command-line-usage-7.0.4.tgz",
"integrity": "sha512-85UdvzTNx/+s5CkSgBm/0hzP80RFHAa7PsfeADE5ezZF3uHz3/Tqj9gIKGT9PTtpycc3Ua64T0oVulGfKxzfqg==",
"license": "MIT",
"dependencies": {
"array-back": "^6.2.2",
"chalk-template": "^0.4.0",
"table-layout": "^4.1.1",
"typical": "^7.3.0"
},
"engines": {
"node": ">=12.20.0"
}
},
"node_modules/content-disposition": {
"version": "0.5.4",
"resolved": "https://registry.npmjs.org/content-disposition/-/content-disposition-0.5.4.tgz",
@ -430,6 +613,29 @@
"node": ">= 0.8"
}
},
"node_modules/find-replace": {
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/find-replace/-/find-replace-5.0.2.tgz",
"integrity": "sha512-Y45BAiE3mz2QsrN2fb5QEtO4qb44NcS7en/0y9PEVsg351HsLeVclP8QPMH79Le9sH3rs5RSwJu99W0WPZO43Q==",
"license": "MIT",
"engines": {
"node": ">=14"
},
"peerDependencies": {
"@75lb/nature": "latest"
},
"peerDependenciesMeta": {
"@75lb/nature": {
"optional": true
}
}
},
"node_modules/flatbuffers": {
"version": "25.9.23",
"resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-25.9.23.tgz",
"integrity": "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ==",
"license": "Apache-2.0"
},
"node_modules/forwarded": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz",
@ -668,6 +874,20 @@
"node": ">=0.12.0"
}
},
"node_modules/json-bignum": {
"version": "0.0.3",
"resolved": "https://registry.npmjs.org/json-bignum/-/json-bignum-0.0.3.tgz",
"integrity": "sha512-2WHyXj3OfHSgNyuzDbSxI1w2jgw5gkWSWhS7Qg4bWXx1nLk3jnbwfUeS0PSba3IzpTUWdHxBieELUzXRjQB2zg==",
"engines": {
"node": ">=0.8"
}
},
"node_modules/lodash.camelcase": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz",
"integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==",
"license": "MIT"
},
"node_modules/math-intrinsics": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz",
@ -1300,6 +1520,19 @@
"node": ">=4"
}
},
"node_modules/table-layout": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/table-layout/-/table-layout-4.1.1.tgz",
"integrity": "sha512-iK5/YhZxq5GO5z8wb0bY1317uDF3Zjpha0QFFLA8/trAoiLbQD0HUbMesEaxyzUgDxi2QlcbM8IvqOlEjgoXBA==",
"license": "MIT",
"dependencies": {
"array-back": "^6.2.2",
"wordwrapjs": "^5.1.0"
},
"engines": {
"node": ">=12.17"
}
},
"node_modules/to-regex-range": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz",
@ -1332,6 +1565,12 @@
"nodetouch": "bin/nodetouch.js"
}
},
"node_modules/tslib": {
"version": "2.8.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz",
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==",
"license": "0BSD"
},
"node_modules/type-is": {
"version": "1.6.18",
"resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz",
@ -1345,6 +1584,15 @@
"node": ">= 0.6"
}
},
"node_modules/typical": {
"version": "7.3.0",
"resolved": "https://registry.npmjs.org/typical/-/typical-7.3.0.tgz",
"integrity": "sha512-ya4mg/30vm+DOWfBg4YK3j2WD6TWtRkCbasOJr40CseYENzCUby/7rIvXA99JGsQHeNxLbnXdyLLxKSv3tauFw==",
"license": "MIT",
"engines": {
"node": ">=12.17"
}
},
"node_modules/undefsafe": {
"version": "2.0.5",
"resolved": "https://registry.npmjs.org/undefsafe/-/undefsafe-2.0.5.tgz",
@ -1352,6 +1600,12 @@
"dev": true,
"license": "MIT"
},
"node_modules/undici-types": {
"version": "7.16.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz",
"integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==",
"license": "MIT"
},
"node_modules/unpipe": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz",
@ -1379,6 +1633,15 @@
"node": ">= 0.8"
}
},
"node_modules/wordwrapjs": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/wordwrapjs/-/wordwrapjs-5.1.1.tgz",
"integrity": "sha512-0yweIbkINJodk27gX9LBGMzyQdBDan3s/dEAiwBOj+Mf0PPyWL6/rikalkv8EeD0E8jm4o5RXEOrFTP3NXbhJg==",
"license": "MIT",
"engines": {
"node": ">=12.17"
}
},
"node_modules/xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",

View File

@ -9,6 +9,7 @@
"build": "cd ui && npm run build"
},
"dependencies": {
"apache-arrow": "^21.1.0",
"cors": "^2.8.5",
"dotenv": "^16.0.0",
"express": "^4.18.2",

View File

@ -1,4 +1,5 @@
const express = require('express');
const { tableFromJSON, RecordBatchStreamWriter, RecordBatch } = require('apache-arrow');
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
const { fcTable } = require('../lib/utils');
@ -61,16 +62,56 @@ module.exports = function(pool) {
return true;
}
// fetch all rows for a version (all iters including reference)
// stream all rows for a version as Arrow IPC (all iters including reference)
router.get('/versions/:id/data', async (req, res) => {
const versionId = parseInt(req.params.id);
let client, committed = false;
try {
const ctx = await getContext(parseInt(req.params.id), 'get_data');
const sql = applyTokens(ctx.sql, { fc_table: ctx.table });
const result = await runSQL(sql);
res.json(result.rows);
const verResult = await pool.query(
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const tbl = fcTable(verResult.rows[0].tname, versionId);
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
const rowCount = parseInt(count);
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
res.setHeader('X-Row-Count', String(rowCount));
if (rowCount === 0) { res.end(); return; }
client = await pool.connect();
await client.query('BEGIN');
await client.query(`DECLARE pf_cur CURSOR FOR SELECT * FROM ${tbl}`);
const writer = RecordBatchStreamWriter.throughNode();
writer.pipe(res);
let schema = null;
while (true) {
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
if (!rows.length) break;
const t = tableFromJSON(rows);
if (!schema) { schema = t.schema; writer.write(schema); }
for (const rb of t.batches) writer.write(new RecordBatch(schema, rb.data));
}
writer.end();
await client.query('COMMIT');
committed = true;
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
else res.destroy();
} finally {
if (client) {
if (!committed) try { await client.query('ROLLBACK'); } catch {}
client.release();
}
}
});

View File

@ -33,8 +33,9 @@ export default function Forecast() {
const [sourceId, setSourceId] = useState('')
const [versions, setVersions] = useState([])
const [versionId, setVersionId] = useState('')
const [loading, setLoading] = useState(false)
const [msg, setMsg] = useState(null)
const [loading, setLoading] = useState(false)
const [largeDataset, setLargeDataset] = useState(false)
const [msg, setMsg] = useState(null)
// layouts
const [layouts, setLayouts] = useState([])
@ -146,23 +147,37 @@ export default function Forecast() {
const viewer = viewerRef.current
if (!viewer) return
setLoading(true)
setLargeDataset(false)
setSlice({})
expandDepthRef.current = null
try {
const [perspective, rows, meta] = await Promise.all([
const [perspective, dataResult, meta] = await Promise.all([
loadPerspective(),
fetch(`/api/versions/${vid}/data`).then(r => r.json()),
fetch(`/api/versions/${vid}/data`).then(async r => {
if (!r.ok) { const { error } = await r.json(); throw new Error(error || 'Failed to load data') }
const rowCount = parseInt(r.headers.get('X-Row-Count') || '0')
const buffer = await r.arrayBuffer()
return { buffer, rowCount }
}),
fetch(`/api/sources/${sid}/cols`).then(r => r.json()),
])
const { buffer, rowCount } = dataResult
colMetaRef.current = meta
const validCols = new Set(rows.length ? Object.keys(rows[0]) : [])
const validCols = new Set([
...meta.filter(c => ['dimension','value','units','date'].includes(c.role)).map(c => c.cname),
'pf_id', 'pf_iter', 'pf_logid', 'pf_user', 'created_at',
])
const tableName = `fc_${vid}`
if (rowCount >= 500000) setLargeDataset(true)
if (workerRef.current) { try { workerRef.current.terminate() } catch {} }
const worker = await perspective.worker()
workerRef.current = worker
tableRef.current = await worker.table(rows, { name: tableName })
tableRef.current = rowCount > 0
? await worker.table(buffer, { name: tableName })
: await worker.table([], { name: tableName })
await viewer.load(worker)
@ -200,6 +215,7 @@ export default function Forecast() {
if (Object.keys(s).length > 0) setSlice(s)
}
viewer.addEventListener('perspective-click', viewer._pspClick)
setLargeDataset(false)
} catch (err) {
flash(err.message, 'error')
@ -565,6 +581,11 @@ export default function Forecast() {
<span className="text-sm text-gray-400">Loading</span>
</div>
)}
{!loading && largeDataset && (
<div className="absolute top-2 left-1/2 -translate-x-1/2 z-10 bg-amber-50 border border-amber-200 text-amber-800 text-xs px-3 py-1.5 rounded shadow-sm">
Large dataset pivot may take a moment to render
</div>
)}
<perspective-viewer ref={viewerRef} style={{ position: 'absolute', inset: 0 }} />
</div>