From a6e6efd36ed7986c8d3da95e5021fc1ad6d33886 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Mon, 27 Apr 2026 22:57:47 -0400 Subject: [PATCH] Switch /data endpoint to Arrow IPC stream with pg cursor batching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server streams rows from a pg cursor in 10k-row batches, building Arrow record batches incrementally and piping them as chunked HTTP response — Node.js heap stays bounded regardless of dataset size. Client fetches as arrayBuffer() and loads directly into Perspective worker (native Arrow path, no JSON deserialization). X-Row-Count header drives a non-blocking banner for datasets >= 500k rows. validCols now derived from col_meta rather than from row keys. Co-Authored-By: Claude Sonnet 4.6 --- package-lock.json | 263 ++++++++++++++++++++++++++++++++++++++ package.json | 1 + routes/operations.js | 53 +++++++- ui/src/views/Forecast.jsx | 33 ++++- 4 files changed, 338 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index 15b6290..43f238f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,7 @@ "name": "pf_app", "version": "1.0.0", "dependencies": { + "apache-arrow": "^21.1.0", "cors": "^2.8.5", "dotenv": "^16.0.0", "express": "^4.18.2", @@ -17,6 +18,36 @@ "nodemon": "^3.0.0" } }, + "node_modules/@swc/helpers": { + "version": "0.5.21", + "resolved": "https://registry.npmjs.org/@swc/helpers/-/helpers-0.5.21.tgz", + "integrity": "sha512-jI/VAmtdjB/RnI8GTnokyX7Ug8c+g+ffD6QRLa6XQewtnGyukKkKSk3wLTM3b5cjt1jNh9x0jfVlagdN2gDKQg==", + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.8.0" + } + }, + "node_modules/@types/command-line-args": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/@types/command-line-args/-/command-line-args-5.2.3.tgz", + "integrity": "sha512-uv0aG6R0Y8WHZLTamZwtfsDLVRnOa+n+n5rEvFWL5Na5gZ8V2Teab/duDPFzIIIhs9qizDpcavCusCLJZu62Kw==", + "license": "MIT" + }, + "node_modules/@types/command-line-usage": { + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/@types/command-line-usage/-/command-line-usage-5.0.4.tgz", + "integrity": "sha512-BwR5KP3Es/CSht0xqBcUXS3qCAUVXwpRKsV2+arxeb65atasuXG9LykC9Ab10Cw3s2raH92ZqOeILaQbsB2ACg==", + "license": "MIT" + }, + "node_modules/@types/node": { + "version": "24.12.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-24.12.2.tgz", + "integrity": "sha512-A1sre26ke7HDIuY/M23nd9gfB+nrmhtYyMINbjI1zHJxYteKR6qSMX56FsmjMcDb3SMcjJg5BiRRgOCC/yBD0g==", + "license": "MIT", + "dependencies": { + "undici-types": "~7.16.0" + } + }, "node_modules/accepts": { "version": "1.3.8", "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", @@ -30,6 +61,21 @@ "node": ">= 0.6" } }, + "node_modules/ansi-styles": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", + "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", + "license": "MIT", + "dependencies": { + "color-convert": "^2.0.1" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/chalk/ansi-styles?sponsor=1" + } + }, "node_modules/anymatch": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.3.tgz", @@ -44,6 +90,35 @@ "node": ">= 8" } }, + "node_modules/apache-arrow": { + "version": "21.1.0", + "resolved": "https://registry.npmjs.org/apache-arrow/-/apache-arrow-21.1.0.tgz", + "integrity": "sha512-kQrYLxhC+NTVVZ4CCzGF6L/uPVOzJmD1T3XgbiUnP7oTeVFOFgEUu6IKNwCDkpFoBVqDKQivlX4RUFqqnWFlEA==", + "license": "Apache-2.0", + "dependencies": { + "@swc/helpers": "^0.5.11", + "@types/command-line-args": "^5.2.3", + "@types/command-line-usage": "^5.0.4", + "@types/node": "^24.0.3", + "command-line-args": "^6.0.1", + "command-line-usage": "^7.0.1", + "flatbuffers": "^25.1.24", + "json-bignum": "^0.0.3", + "tslib": "^2.6.2" + }, + "bin": { + "arrow2csv": "bin/arrow2csv.js" + } + }, + "node_modules/array-back": { + "version": "6.2.3", + "resolved": "https://registry.npmjs.org/array-back/-/array-back-6.2.3.tgz", + "integrity": "sha512-SGDvmg6QTYiTxCBkYVmThcoa67uLl35pyzRHdpCGBOcqFy6BtwnphoFPk7LhJshD+Yk1Kt35WGWeZPTgwR4Fhw==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, "node_modules/array-flatten": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz", @@ -161,6 +236,58 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, + "node_modules/chalk-template": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/chalk-template/-/chalk-template-0.4.0.tgz", + "integrity": "sha512-/ghrgmhfY8RaSdeo43hNXxpoHAtxdbskUHjPpfqUWGttFgycUhYPGx3YZBCnUCvOa7Doivn1IZec3DEGFoMgLg==", + "license": "MIT", + "dependencies": { + "chalk": "^4.1.2" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/chalk/chalk-template?sponsor=1" + } + }, + "node_modules/chalk/node_modules/has-flag": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", + "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", + "license": "MIT", + "engines": { + "node": ">=8" + } + }, + "node_modules/chalk/node_modules/supports-color": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", + "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", + "license": "MIT", + "dependencies": { + "has-flag": "^4.0.0" + }, + "engines": { + "node": ">=8" + } + }, "node_modules/chokidar": { "version": "3.6.0", "resolved": "https://registry.npmjs.org/chokidar/-/chokidar-3.6.0.tgz", @@ -186,6 +313,62 @@ "fsevents": "~2.3.2" } }, + "node_modules/color-convert": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "license": "MIT", + "dependencies": { + "color-name": "~1.1.4" + }, + "engines": { + "node": ">=7.0.0" + } + }, + "node_modules/color-name": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", + "license": "MIT" + }, + "node_modules/command-line-args": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/command-line-args/-/command-line-args-6.0.2.tgz", + "integrity": "sha512-AIjYVxrV9X752LmPDLbVYv8aMCuHPSLZJXEo2qo/xJfv+NYhaZ4sMSF01rM+gHPaMgvPM0l5D/F+Qx+i2WfSmQ==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.3", + "find-replace": "^5.0.2", + "lodash.camelcase": "^4.3.0", + "typical": "^7.3.0" + }, + "engines": { + "node": ">=12.20" + }, + "peerDependencies": { + "@75lb/nature": "latest" + }, + "peerDependenciesMeta": { + "@75lb/nature": { + "optional": true + } + } + }, + "node_modules/command-line-usage": { + "version": "7.0.4", + "resolved": "https://registry.npmjs.org/command-line-usage/-/command-line-usage-7.0.4.tgz", + "integrity": "sha512-85UdvzTNx/+s5CkSgBm/0hzP80RFHAa7PsfeADE5ezZF3uHz3/Tqj9gIKGT9PTtpycc3Ua64T0oVulGfKxzfqg==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.2", + "chalk-template": "^0.4.0", + "table-layout": "^4.1.1", + "typical": "^7.3.0" + }, + "engines": { + "node": ">=12.20.0" + } + }, "node_modules/content-disposition": { "version": "0.5.4", "resolved": "https://registry.npmjs.org/content-disposition/-/content-disposition-0.5.4.tgz", @@ -430,6 +613,29 @@ "node": ">= 0.8" } }, + "node_modules/find-replace": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/find-replace/-/find-replace-5.0.2.tgz", + "integrity": "sha512-Y45BAiE3mz2QsrN2fb5QEtO4qb44NcS7en/0y9PEVsg351HsLeVclP8QPMH79Le9sH3rs5RSwJu99W0WPZO43Q==", + "license": "MIT", + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@75lb/nature": "latest" + }, + "peerDependenciesMeta": { + "@75lb/nature": { + "optional": true + } + } + }, + "node_modules/flatbuffers": { + "version": "25.9.23", + "resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-25.9.23.tgz", + "integrity": "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ==", + "license": "Apache-2.0" + }, "node_modules/forwarded": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz", @@ -668,6 +874,20 @@ "node": ">=0.12.0" } }, + "node_modules/json-bignum": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/json-bignum/-/json-bignum-0.0.3.tgz", + "integrity": "sha512-2WHyXj3OfHSgNyuzDbSxI1w2jgw5gkWSWhS7Qg4bWXx1nLk3jnbwfUeS0PSba3IzpTUWdHxBieELUzXRjQB2zg==", + "engines": { + "node": ">=0.8" + } + }, + "node_modules/lodash.camelcase": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz", + "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==", + "license": "MIT" + }, "node_modules/math-intrinsics": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz", @@ -1300,6 +1520,19 @@ "node": ">=4" } }, + "node_modules/table-layout": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/table-layout/-/table-layout-4.1.1.tgz", + "integrity": "sha512-iK5/YhZxq5GO5z8wb0bY1317uDF3Zjpha0QFFLA8/trAoiLbQD0HUbMesEaxyzUgDxi2QlcbM8IvqOlEjgoXBA==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.2", + "wordwrapjs": "^5.1.0" + }, + "engines": { + "node": ">=12.17" + } + }, "node_modules/to-regex-range": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz", @@ -1332,6 +1565,12 @@ "nodetouch": "bin/nodetouch.js" } }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, "node_modules/type-is": { "version": "1.6.18", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz", @@ -1345,6 +1584,15 @@ "node": ">= 0.6" } }, + "node_modules/typical": { + "version": "7.3.0", + "resolved": "https://registry.npmjs.org/typical/-/typical-7.3.0.tgz", + "integrity": "sha512-ya4mg/30vm+DOWfBg4YK3j2WD6TWtRkCbasOJr40CseYENzCUby/7rIvXA99JGsQHeNxLbnXdyLLxKSv3tauFw==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, "node_modules/undefsafe": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/undefsafe/-/undefsafe-2.0.5.tgz", @@ -1352,6 +1600,12 @@ "dev": true, "license": "MIT" }, + "node_modules/undici-types": { + "version": "7.16.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz", + "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==", + "license": "MIT" + }, "node_modules/unpipe": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz", @@ -1379,6 +1633,15 @@ "node": ">= 0.8" } }, + "node_modules/wordwrapjs": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/wordwrapjs/-/wordwrapjs-5.1.1.tgz", + "integrity": "sha512-0yweIbkINJodk27gX9LBGMzyQdBDan3s/dEAiwBOj+Mf0PPyWL6/rikalkv8EeD0E8jm4o5RXEOrFTP3NXbhJg==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/package.json b/package.json index 705772e..7e028cb 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ "build": "cd ui && npm run build" }, "dependencies": { + "apache-arrow": "^21.1.0", "cors": "^2.8.5", "dotenv": "^16.0.0", "express": "^4.18.2", diff --git a/routes/operations.js b/routes/operations.js index 38c1426..eead120 100644 --- a/routes/operations.js +++ b/routes/operations.js @@ -1,4 +1,5 @@ const express = require('express'); +const { tableFromJSON, RecordBatchStreamWriter, RecordBatch } = require('apache-arrow'); const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator'); const { fcTable } = require('../lib/utils'); @@ -61,16 +62,56 @@ module.exports = function(pool) { return true; } - // fetch all rows for a version (all iters including reference) + // stream all rows for a version as Arrow IPC (all iters including reference) router.get('/versions/:id/data', async (req, res) => { + const versionId = parseInt(req.params.id); + let client, committed = false; try { - const ctx = await getContext(parseInt(req.params.id), 'get_data'); - const sql = applyTokens(ctx.sql, { fc_table: ctx.table }); - const result = await runSQL(sql); - res.json(result.rows); + const verResult = await pool.query( + `SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`, + [versionId] + ); + if (!verResult.rows.length) { + const err = new Error('Version not found'); err.status = 404; throw err; + } + const tbl = fcTable(verResult.rows[0].tname, versionId); + + const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`); + const rowCount = parseInt(count); + + res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream'); + res.setHeader('X-Row-Count', String(rowCount)); + + if (rowCount === 0) { res.end(); return; } + + client = await pool.connect(); + await client.query('BEGIN'); + await client.query(`DECLARE pf_cur CURSOR FOR SELECT * FROM ${tbl}`); + + const writer = RecordBatchStreamWriter.throughNode(); + writer.pipe(res); + + let schema = null; + while (true) { + const { rows } = await client.query('FETCH 10000 FROM pf_cur'); + if (!rows.length) break; + const t = tableFromJSON(rows); + if (!schema) { schema = t.schema; writer.write(schema); } + for (const rb of t.batches) writer.write(new RecordBatch(schema, rb.data)); + } + + writer.end(); + await client.query('COMMIT'); + committed = true; } catch (err) { console.error(err); - res.status(err.status || 500).json({ error: err.message }); + if (!res.headersSent) res.status(err.status || 500).json({ error: err.message }); + else res.destroy(); + } finally { + if (client) { + if (!committed) try { await client.query('ROLLBACK'); } catch {} + client.release(); + } } }); diff --git a/ui/src/views/Forecast.jsx b/ui/src/views/Forecast.jsx index 99b9fb8..3b76e6d 100644 --- a/ui/src/views/Forecast.jsx +++ b/ui/src/views/Forecast.jsx @@ -33,8 +33,9 @@ export default function Forecast() { const [sourceId, setSourceId] = useState('') const [versions, setVersions] = useState([]) const [versionId, setVersionId] = useState('') - const [loading, setLoading] = useState(false) - const [msg, setMsg] = useState(null) + const [loading, setLoading] = useState(false) + const [largeDataset, setLargeDataset] = useState(false) + const [msg, setMsg] = useState(null) // layouts const [layouts, setLayouts] = useState([]) @@ -146,23 +147,37 @@ export default function Forecast() { const viewer = viewerRef.current if (!viewer) return setLoading(true) + setLargeDataset(false) setSlice({}) expandDepthRef.current = null try { - const [perspective, rows, meta] = await Promise.all([ + const [perspective, dataResult, meta] = await Promise.all([ loadPerspective(), - fetch(`/api/versions/${vid}/data`).then(r => r.json()), + fetch(`/api/versions/${vid}/data`).then(async r => { + if (!r.ok) { const { error } = await r.json(); throw new Error(error || 'Failed to load data') } + const rowCount = parseInt(r.headers.get('X-Row-Count') || '0') + const buffer = await r.arrayBuffer() + return { buffer, rowCount } + }), fetch(`/api/sources/${sid}/cols`).then(r => r.json()), ]) + const { buffer, rowCount } = dataResult colMetaRef.current = meta - const validCols = new Set(rows.length ? Object.keys(rows[0]) : []) + const validCols = new Set([ + ...meta.filter(c => ['dimension','value','units','date'].includes(c.role)).map(c => c.cname), + 'pf_id', 'pf_iter', 'pf_logid', 'pf_user', 'created_at', + ]) const tableName = `fc_${vid}` + if (rowCount >= 500000) setLargeDataset(true) + if (workerRef.current) { try { workerRef.current.terminate() } catch {} } const worker = await perspective.worker() workerRef.current = worker - tableRef.current = await worker.table(rows, { name: tableName }) + tableRef.current = rowCount > 0 + ? await worker.table(buffer, { name: tableName }) + : await worker.table([], { name: tableName }) await viewer.load(worker) @@ -200,6 +215,7 @@ export default function Forecast() { if (Object.keys(s).length > 0) setSlice(s) } viewer.addEventListener('perspective-click', viewer._pspClick) + setLargeDataset(false) } catch (err) { flash(err.message, 'error') @@ -565,6 +581,11 @@ export default function Forecast() { Loading… )} + {!loading && largeDataset && ( +
+ Large dataset — pivot may take a moment to render +
+ )}