Fix forecast data load and add byte-progress UI

pg now returns bigint/numeric as JS numbers so Arrow infers Int/Float64
instead of Dictionary<Utf8>. /data accumulates rows and emits a single
record batch to avoid dictionary REPLACEMENT messages that crash
Perspective's WASM reader. Forecast view streams the response body and
shows received/total bytes while loading. Drops stale public/ static
middleware that was shadowing the React build at /.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-04-28 19:51:39 -04:00
parent a6e6efd36e
commit a9ca58a845
4 changed files with 58 additions and 17 deletions

1
package-lock.json generated
View File

@ -1105,7 +1105,6 @@
"resolved": "https://registry.npmjs.org/pg/-/pg-8.20.0.tgz", "resolved": "https://registry.npmjs.org/pg/-/pg-8.20.0.tgz",
"integrity": "sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==", "integrity": "sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==",
"license": "MIT", "license": "MIT",
"peer": true,
"dependencies": { "dependencies": {
"pg-connection-string": "^2.12.0", "pg-connection-string": "^2.12.0",
"pg-pool": "^3.13.0", "pg-pool": "^3.13.0",

View File

@ -1,5 +1,5 @@
const express = require('express'); const express = require('express');
const { tableFromJSON, RecordBatchStreamWriter, RecordBatch } = require('apache-arrow'); const { tableFromJSON, tableToIPC } = require('apache-arrow');
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator'); const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
const { fcTable } = require('../lib/utils'); const { fcTable } = require('../lib/utils');
@ -88,21 +88,22 @@ module.exports = function(pool) {
await client.query('BEGIN'); await client.query('BEGIN');
await client.query(`DECLARE pf_cur CURSOR FOR SELECT * FROM ${tbl}`); await client.query(`DECLARE pf_cur CURSOR FOR SELECT * FROM ${tbl}`);
const writer = RecordBatchStreamWriter.throughNode(); // Accumulate rows from the cursor, then emit a single Arrow record batch.
writer.pipe(res); // Per-batch tableFromJSON() builds independent dictionaries, which forces the
// writer to emit dictionary REPLACEMENT messages between batches — Perspective's
let schema = null; // WASM Arrow reader crashes on those (memory access out of bounds).
const allRows = [];
while (true) { while (true) {
const { rows } = await client.query('FETCH 10000 FROM pf_cur'); const { rows } = await client.query('FETCH 10000 FROM pf_cur');
if (!rows.length) break; if (!rows.length) break;
const t = tableFromJSON(rows); for (const r of rows) allRows.push(r);
if (!schema) { schema = t.schema; writer.write(schema); }
for (const rb of t.batches) writer.write(new RecordBatch(schema, rb.data));
} }
writer.end();
await client.query('COMMIT'); await client.query('COMMIT');
committed = true; committed = true;
const buf = tableToIPC(tableFromJSON(allRows), 'stream');
res.setHeader('Content-Length', String(buf.byteLength));
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
} catch (err) { } catch (err) {
console.error(err); console.error(err);
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message }); if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });

View File

@ -1,14 +1,17 @@
require('dotenv').config(); require('dotenv').config();
const express = require('express'); const express = require('express');
const cors = require('cors'); const cors = require('cors');
const { Pool } = require('pg'); const { Pool, types } = require('pg');
// Return bigint (oid 20) and numeric (oid 1700) as JS numbers instead of strings,
// so apache-arrow's tableFromJSON infers Int/Float64 rather than Dictionary<Utf8>.
types.setTypeParser(20, v => v === null ? null : Number(v));
types.setTypeParser(1700, v => v === null ? null : Number(v));
const app = express(); const app = express();
app.use(cors()); app.use(cors());
app.use(express.json()); app.use(express.json());
app.use(express.static('public'));
app.use(express.static('public/app')); app.use(express.static('public/app'));
app.get('/', (req, res) => res.sendFile(__dirname + '/public/app/index.html'));
const pool = new Pool({ const pool = new Pool({
host: process.env.DB_HOST, host: process.env.DB_HOST,

View File

@ -35,6 +35,7 @@ export default function Forecast() {
const [versionId, setVersionId] = useState('') const [versionId, setVersionId] = useState('')
const [loading, setLoading] = useState(false) const [loading, setLoading] = useState(false)
const [largeDataset, setLargeDataset] = useState(false) const [largeDataset, setLargeDataset] = useState(false)
const [loadProgress, setLoadProgress] = useState(null) // { received, total }
const [msg, setMsg] = useState(null) const [msg, setMsg] = useState(null)
// layouts // layouts
@ -148,6 +149,7 @@ export default function Forecast() {
if (!viewer) return if (!viewer) return
setLoading(true) setLoading(true)
setLargeDataset(false) setLargeDataset(false)
setLoadProgress(null)
setSlice({}) setSlice({})
expandDepthRef.current = null expandDepthRef.current = null
try { try {
@ -156,8 +158,22 @@ export default function Forecast() {
fetch(`/api/versions/${vid}/data`).then(async r => { fetch(`/api/versions/${vid}/data`).then(async r => {
if (!r.ok) { const { error } = await r.json(); throw new Error(error || 'Failed to load data') } if (!r.ok) { const { error } = await r.json(); throw new Error(error || 'Failed to load data') }
const rowCount = parseInt(r.headers.get('X-Row-Count') || '0') const rowCount = parseInt(r.headers.get('X-Row-Count') || '0')
const buffer = await r.arrayBuffer() const total = parseInt(r.headers.get('Content-Length') || '0') || null
return { buffer, rowCount } const reader = r.body.getReader()
const chunks = []
let received = 0
setLoadProgress({ received: 0, total })
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(value)
received += value.byteLength
setLoadProgress({ received, total })
}
const merged = new Uint8Array(received)
let pos = 0
for (const c of chunks) { merged.set(c, pos); pos += c.byteLength }
return { buffer: merged.buffer, rowCount }
}), }),
fetch(`/api/sources/${sid}/cols`).then(r => r.json()), fetch(`/api/sources/${sid}/cols`).then(r => r.json()),
]) ])
@ -577,8 +593,24 @@ export default function Forecast() {
{/* Perspective viewer */} {/* Perspective viewer */}
<div className="relative flex-1 min-w-0"> <div className="relative flex-1 min-w-0">
{loading && ( {loading && (
<div className="absolute inset-0 flex items-center justify-center bg-gray-50 z-10"> <div className="absolute inset-0 flex flex-col items-center justify-center bg-gray-50 z-10 gap-2">
<span className="text-sm text-gray-400">Loading</span> <span className="text-sm text-gray-400">Loading</span>
{loadProgress && (
<>
<span className="text-xs text-gray-400 font-mono">
{fmtBytes(loadProgress.received)}
{loadProgress.total ? ` / ${fmtBytes(loadProgress.total)}` : ''}
</span>
{loadProgress.total > 0 && (
<div className="w-48 h-1 bg-gray-200 rounded overflow-hidden">
<div
className="h-full bg-blue-400 transition-all"
style={{ width: `${Math.min(100, (loadProgress.received / loadProgress.total) * 100)}%` }}
/>
</div>
)}
</>
)}
</div> </div>
)} )}
{!loading && largeDataset && ( {!loading && largeDataset && (
@ -706,6 +738,12 @@ export default function Forecast() {
const inp = 'border border-gray-200 rounded px-2 py-1 text-xs flex-1 bg-white min-w-0' const inp = 'border border-gray-200 rounded px-2 py-1 text-xs flex-1 bg-white min-w-0'
function fmtBytes(n) {
if (n < 1024) return `${n} B`
if (n < 1048576) return `${(n / 1024).toFixed(1)} KB`
return `${(n / 1048576).toFixed(1)} MB`
}
function fmtStamp(stamp) { function fmtStamp(stamp) {
return new Date(stamp).toLocaleString(undefined, { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit' }) return new Date(stamp).toLocaleString(undefined, { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit' })
} }