From a9ca58a845c246ea2aec1cba6a279c2be327f8a1 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Tue, 28 Apr 2026 19:51:39 -0400 Subject: [PATCH] Fix forecast data load and add byte-progress UI pg now returns bigint/numeric as JS numbers so Arrow infers Int/Float64 instead of Dictionary. /data accumulates rows and emits a single record batch to avoid dictionary REPLACEMENT messages that crash Perspective's WASM reader. Forecast view streams the response body and shows received/total bytes while loading. Drops stale public/ static middleware that was shadowing the React build at /. Co-Authored-By: Claude Opus 4.7 --- package-lock.json | 1 - routes/operations.js | 21 ++++++++++--------- server.js | 9 +++++--- ui/src/views/Forecast.jsx | 44 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 58 insertions(+), 17 deletions(-) diff --git a/package-lock.json b/package-lock.json index 43f238f..e85dee9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1105,7 +1105,6 @@ "resolved": "https://registry.npmjs.org/pg/-/pg-8.20.0.tgz", "integrity": "sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==", "license": "MIT", - "peer": true, "dependencies": { "pg-connection-string": "^2.12.0", "pg-pool": "^3.13.0", diff --git a/routes/operations.js b/routes/operations.js index eead120..adaefd3 100644 --- a/routes/operations.js +++ b/routes/operations.js @@ -1,5 +1,5 @@ const express = require('express'); -const { tableFromJSON, RecordBatchStreamWriter, RecordBatch } = require('apache-arrow'); +const { tableFromJSON, tableToIPC } = require('apache-arrow'); const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator'); const { fcTable } = require('../lib/utils'); @@ -88,21 +88,22 @@ module.exports = function(pool) { await client.query('BEGIN'); await client.query(`DECLARE pf_cur CURSOR FOR SELECT * FROM ${tbl}`); - const writer = RecordBatchStreamWriter.throughNode(); - writer.pipe(res); - - let schema = null; + // Accumulate rows from the cursor, then emit a single Arrow record batch. + // Per-batch tableFromJSON() builds independent dictionaries, which forces the + // writer to emit dictionary REPLACEMENT messages between batches — Perspective's + // WASM Arrow reader crashes on those (memory access out of bounds). + const allRows = []; while (true) { const { rows } = await client.query('FETCH 10000 FROM pf_cur'); if (!rows.length) break; - const t = tableFromJSON(rows); - if (!schema) { schema = t.schema; writer.write(schema); } - for (const rb of t.batches) writer.write(new RecordBatch(schema, rb.data)); + for (const r of rows) allRows.push(r); } - - writer.end(); await client.query('COMMIT'); committed = true; + + const buf = tableToIPC(tableFromJSON(allRows), 'stream'); + res.setHeader('Content-Length', String(buf.byteLength)); + res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength)); } catch (err) { console.error(err); if (!res.headersSent) res.status(err.status || 500).json({ error: err.message }); diff --git a/server.js b/server.js index a8ea2fb..f6776d5 100644 --- a/server.js +++ b/server.js @@ -1,14 +1,17 @@ require('dotenv').config(); const express = require('express'); const cors = require('cors'); -const { Pool } = require('pg'); +const { Pool, types } = require('pg'); + +// Return bigint (oid 20) and numeric (oid 1700) as JS numbers instead of strings, +// so apache-arrow's tableFromJSON infers Int/Float64 rather than Dictionary. +types.setTypeParser(20, v => v === null ? null : Number(v)); +types.setTypeParser(1700, v => v === null ? null : Number(v)); const app = express(); app.use(cors()); app.use(express.json()); -app.use(express.static('public')); app.use(express.static('public/app')); -app.get('/', (req, res) => res.sendFile(__dirname + '/public/app/index.html')); const pool = new Pool({ host: process.env.DB_HOST, diff --git a/ui/src/views/Forecast.jsx b/ui/src/views/Forecast.jsx index 3b76e6d..bf3719e 100644 --- a/ui/src/views/Forecast.jsx +++ b/ui/src/views/Forecast.jsx @@ -35,6 +35,7 @@ export default function Forecast() { const [versionId, setVersionId] = useState('') const [loading, setLoading] = useState(false) const [largeDataset, setLargeDataset] = useState(false) + const [loadProgress, setLoadProgress] = useState(null) // { received, total } const [msg, setMsg] = useState(null) // layouts @@ -148,6 +149,7 @@ export default function Forecast() { if (!viewer) return setLoading(true) setLargeDataset(false) + setLoadProgress(null) setSlice({}) expandDepthRef.current = null try { @@ -156,8 +158,22 @@ export default function Forecast() { fetch(`/api/versions/${vid}/data`).then(async r => { if (!r.ok) { const { error } = await r.json(); throw new Error(error || 'Failed to load data') } const rowCount = parseInt(r.headers.get('X-Row-Count') || '0') - const buffer = await r.arrayBuffer() - return { buffer, rowCount } + const total = parseInt(r.headers.get('Content-Length') || '0') || null + const reader = r.body.getReader() + const chunks = [] + let received = 0 + setLoadProgress({ received: 0, total }) + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + received += value.byteLength + setLoadProgress({ received, total }) + } + const merged = new Uint8Array(received) + let pos = 0 + for (const c of chunks) { merged.set(c, pos); pos += c.byteLength } + return { buffer: merged.buffer, rowCount } }), fetch(`/api/sources/${sid}/cols`).then(r => r.json()), ]) @@ -577,8 +593,24 @@ export default function Forecast() { {/* Perspective viewer */}
{loading && ( -
+
Loading… + {loadProgress && ( + <> + + {fmtBytes(loadProgress.received)} + {loadProgress.total ? ` / ${fmtBytes(loadProgress.total)}` : ''} + + {loadProgress.total > 0 && ( +
+
+
+ )} + + )}
)} {!loading && largeDataset && ( @@ -706,6 +738,12 @@ export default function Forecast() { const inp = 'border border-gray-200 rounded px-2 py-1 text-xs flex-1 bg-white min-w-0' +function fmtBytes(n) { + if (n < 1024) return `${n} B` + if (n < 1048576) return `${(n / 1024).toFixed(1)} KB` + return `${(n / 1048576).toFixed(1)} MB` +} + function fmtStamp(stamp) { return new Date(stamp).toLocaleString(undefined, { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit' }) }