diff --git a/routes/operations.js b/routes/operations.js index ca4cb6e..662eede 100644 --- a/routes/operations.js +++ b/routes/operations.js @@ -1,5 +1,5 @@ const express = require('express'); -const { tableFromJSON, tableToIPC } = require('apache-arrow'); +const { tableFromArrays, tableToIPC } = require('apache-arrow'); const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator'); const { fcTable } = require('../lib/utils'); @@ -88,25 +88,28 @@ module.exports = function(pool) { await client.query('BEGIN'); await client.query(` DECLARE pf_cur CURSOR FOR - SELECT f.*, l.note AS pf_note, l.operation AS pf_op - FROM ${tbl} f - LEFT JOIN pf.log l ON l.id = f.pf_logid + SELECT * FROM ${tbl} `); - // Accumulate rows from the cursor, then emit a single Arrow record batch. - // Per-batch tableFromJSON() builds independent dictionaries, which forces the - // writer to emit dictionary REPLACEMENT messages between batches — Perspective's - // WASM Arrow reader crashes on those (memory access out of bounds). - const allRows = []; + // Accumulate into column arrays (not row objects) to avoid allocating one JS + // object per row — cuts peak heap by ~3-5× for large datasets. + // Still emits a single Arrow record batch so Perspective WASM never sees + // dictionary REPLACEMENT messages (which crash its Arrow reader). + let colArrays = null; while (true) { const { rows } = await client.query('FETCH 10000 FROM pf_cur'); if (!rows.length) break; - for (const r of rows) allRows.push(r); + if (!colArrays) { + colArrays = Object.fromEntries(Object.keys(rows[0]).map(k => [k, []])); + } + for (const row of rows) { + for (const k of Object.keys(colArrays)) colArrays[k].push(row[k]); + } } await client.query('COMMIT'); committed = true; - const buf = tableToIPC(tableFromJSON(allRows), 'stream'); + const buf = tableToIPC(tableFromArrays(colArrays || {}), 'stream'); res.setHeader('Content-Length', String(buf.byteLength)); res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength)); } catch (err) { diff --git a/ui/src/views/Forecast.jsx b/ui/src/views/Forecast.jsx index 3fffda6..82d2ab3 100644 --- a/ui/src/views/Forecast.jsx +++ b/ui/src/views/Forecast.jsx @@ -169,14 +169,20 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou const reader = r.body.getReader() const chunks = [] let received = 0 + let lastUpdate = 0 setLoadProgress({ received: 0, total }) while (true) { const { done, value } = await reader.read() if (done) break chunks.push(value) received += value.byteLength - setLoadProgress({ received, total }) + const now = Date.now() + if (now - lastUpdate >= 100) { + setLoadProgress({ received, total }) + lastUpdate = now + } } + setLoadProgress({ received, total }) const merged = new Uint8Array(received) let pos = 0 for (const c of chunks) { merged.set(c, pos); pos += c.byteLength } @@ -200,26 +206,19 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou if (!workerRef.current) workerRef.current = await perspective.worker() const worker = workerRef.current + // Clean up the previous table — by JS reference first, then by name in the + // worker registry (covers the case where the ref was lost or delete failed). if (tableRef.current) { try { await tableRef.current.delete() } catch {} tableRef.current = null } + try { + const stale = await worker.open_table(tableName) + if (stale) await stale.delete() + } catch {} const opts = { name: tableName, index: 'pf_id' } - const makeTable = async () => rowCount > 0 ? worker.table(buffer, opts) : worker.table([], opts) - try { - tableRef.current = await makeTable() - } catch (err) { - if (/already exists/i.test(String(err?.message || err))) { - try { - const existing = await worker.open_table(tableName) - if (existing) await existing.delete() - } catch {} - tableRef.current = await makeTable() - } else { - throw err - } - } + tableRef.current = await (rowCount > 0 ? worker.table(buffer, opts) : worker.table([], opts)) if (myId !== initIdRef.current) { try { await tableRef.current.delete() } catch {} @@ -227,27 +226,30 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou return } - await viewer.load(worker) + // Load by direct table reference — avoids "No Table attached" on large datasets + // that occurs when viewer.load(worker) + restore({ table: name }) can't resolve + // the named table in time. + await viewer.load(tableRef.current) viewer.setAttribute('theme', dark ? 'Pro Dark' : 'Pro Light') // restore last-used layout or build default + // Strip cfg.table — table is already loaded by reference above; a stale name + // in a saved config would cause Perspective to fail the name lookup. const saved = localStorage.getItem(LAYOUT_KEY(vid)) if (saved) { - const cfg = cleanLayout(JSON.parse(saved), validCols) - cfg.plugin_config = { edit_mode: 'SELECT_REGION', ...(cfg.plugin_config || {}) } + const { table: _t, ...rest } = cleanLayout(JSON.parse(saved), validCols) + const cfg = { ...rest, plugin_config: { edit_mode: 'SELECT_REGION', ...(rest.plugin_config || {}) } } await viewer.restore(cfg) if (cfg.expand_depth != null) await applyDepth(cfg.expand_depth) } else { const sourceDefault = sources.find(s => String(s.id) === String(sid))?.default_layout let cfg if (sourceDefault && Object.keys(sourceDefault).length > 0) { - cfg = cleanLayout(sourceDefault, validCols) - cfg.table = tableName - cfg.plugin_config = { edit_mode: 'SELECT_REGION', ...(cfg.plugin_config || {}) } + const { table: _t, ...rest } = cleanLayout(sourceDefault, validCols) + cfg = { ...rest, plugin_config: { edit_mode: 'SELECT_REGION', ...(rest.plugin_config || {}) } } } else { const valueCol = meta.find(c => c.role === 'value')?.cname cfg = { - table: tableName, settings: false, group_by: ['pf_iter'], columns: valueCol ? [valueCol] : [], @@ -285,7 +287,8 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou setLargeDataset(false) } catch (err) { - flash(err.message, 'error') + console.error('[initViewer]', err) + flash(err.message || String(err), 'error') } finally { setLoading(false) } @@ -432,7 +435,7 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou function flash(text, type = 'ok') { setMsg({ text, type }) - setTimeout(() => setMsg(null), 3000) + if (type !== 'error') setTimeout(() => setMsg(null), 3000) } async function openLog() { @@ -557,8 +560,11 @@ export default function Forecast({ sources = [], sourceId, versionId, refreshSou {msg && ( - + {msg.text} + {msg.type === 'error' && ( + + )} )}