pf_app/routes/operations.js
Paul Trowbridge 2ee0d18f2e Fix large dataset loading in Forecast view
- Switch server Arrow encoding from tableFromJSON (row objects) to
  tableFromArrays (column arrays) — cuts peak Node heap 3-5x for large
  datasets by avoiding one JS object per row
- Remove unused pf.log JOIN from data endpoint; forecast rows only
- Load Perspective viewer with direct table reference instead of worker
  Server object — fixes "No Table attached" error on large datasets where
  named-table registry lookup raced against WASM initialization
- Pre-emptively clean up stale named table in worker registry before
  creating, eliminating the "already exists" retry path that silently
  swallowed errors (finally ran but flash never fired)
- Strip cfg.table from restore configs since table is loaded by reference
- Throttle progress bar updates to 100ms intervals (was every chunk)
- Persist load errors until dismissed; add console.error for devtools

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-21 20:52:47 -04:00

529 lines
22 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const express = require('express');
const { tableFromArrays, tableToIPC } = require('apache-arrow');
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
const { fcTable } = require('../lib/utils');
module.exports = function(pool) {
const router = express.Router();
async function runSQL(sql) {
console.log('--- SQL ---\n', sql, '\n--- END SQL ---');
return pool.query(sql);
}
// fetch everything needed to execute an operation:
// version + source info, col_meta, fc_table name, stored SQL
async function getContext(versionId, operation) {
const verResult = await pool.query(`
SELECT v.*, s.schema, s.tname, s.id AS source_id
FROM pf.version v
JOIN pf.source s ON s.id = v.source_id
WHERE v.id = $1
`, [versionId]);
if (verResult.rows.length === 0) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const version = verResult.rows[0];
const colResult = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[version.source_id]
);
const colMeta = colResult.rows;
const dimCols = colMeta.filter(c => c.role === 'dimension').map(c => c.cname);
const valueCol = colMeta.find(c => c.role === 'value')?.cname;
const unitsCol = colMeta.find(c => c.role === 'units')?.cname;
const sqlResult = await pool.query(
`SELECT sql FROM pf.sql WHERE source_id = $1 AND operation = $2`,
[version.source_id, operation]
);
if (sqlResult.rows.length === 0) {
const err = new Error(`No generated SQL for operation "${operation}" — run generate-sql first`);
err.status = 400; throw err;
}
return {
version,
table: fcTable(version.tname, version.id),
colMeta,
dimCols,
valueCol,
unitsCol,
sql: sqlResult.rows[0].sql
};
}
function guardOpen(version, res) {
if (version.status === 'closed') {
res.status(403).json({ error: 'Version is closed' });
return false;
}
return true;
}
// stream all rows for a version as Arrow IPC (all iters including reference)
router.get('/versions/:id/data', async (req, res) => {
const versionId = parseInt(req.params.id);
let client, committed = false;
try {
const verResult = await pool.query(
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const tbl = fcTable(verResult.rows[0].tname, versionId);
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
const rowCount = parseInt(count);
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
res.setHeader('X-Row-Count', String(rowCount));
if (rowCount === 0) { res.end(); return; }
client = await pool.connect();
await client.query('BEGIN');
await client.query(`
DECLARE pf_cur CURSOR FOR
SELECT * FROM ${tbl}
`);
// Accumulate into column arrays (not row objects) to avoid allocating one JS
// object per row — cuts peak heap by ~3-5× for large datasets.
// Still emits a single Arrow record batch so Perspective WASM never sees
// dictionary REPLACEMENT messages (which crash its Arrow reader).
let colArrays = null;
while (true) {
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
if (!rows.length) break;
if (!colArrays) {
colArrays = Object.fromEntries(Object.keys(rows[0]).map(k => [k, []]));
}
for (const row of rows) {
for (const k of Object.keys(colArrays)) colArrays[k].push(row[k]);
}
}
await client.query('COMMIT');
committed = true;
const buf = tableToIPC(tableFromArrays(colArrays || {}), 'stream');
res.setHeader('Content-Length', String(buf.byteLength));
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
} catch (err) {
console.error(err);
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
else res.destroy();
} finally {
if (client) {
if (!committed) try { await client.query('ROLLBACK'); } catch {}
client.release();
}
}
});
// load baseline rows from source table — additive, no delete
router.post('/versions/:id/baseline', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'baseline');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// edit a baseline or reference segment in place — only allowed before any
// scale/recode/clone has been applied on this version, since those would
// have been calibrated against the old segment's totals.
router.put('/versions/:id/baseline/:logid', async (req, res) => {
const versionId = parseInt(req.params.id);
const logid = parseInt(req.params.logid);
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
const client = await pool.connect();
try {
const logResult = await client.query(
`SELECT * FROM pf.log WHERE id = $1 AND version_id = $2`,
[logid, versionId]
);
if (logResult.rows.length === 0) {
return res.status(404).json({ error: 'Log entry not found' });
}
const oldLog = logResult.rows[0];
if (!['baseline', 'reference'].includes(oldLog.operation)) {
return res.status(400).json({ error: 'Only baseline or reference segments can be edited' });
}
const opsResult = await client.query(
`SELECT COUNT(*)::int AS n FROM pf.log
WHERE version_id = $1 AND operation IN ('scale', 'recode', 'clone')`,
[versionId]
);
if (opsResult.rows[0].n > 0) {
return res.status(409).json({
error: 'Cannot edit segments after forecast operations have been applied. Undo the operations first.'
});
}
const ctx = await getContext(versionId, oldLog.operation);
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_logid = $1 RETURNING pf_id`,
[logid]
);
await client.query(`DELETE FROM pf.log WHERE id = $1`, [logid]);
const insResult = await client.query(sql);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id),
rows_affected: insResult.rows[0]?.rows_affected ?? 0
});
} catch (err) {
try { await client.query('ROLLBACK'); } catch {}
console.error(err);
res.status(err.status || 500).json({ error: err.message });
} finally {
client.release();
}
});
// delete all baseline rows and log entries for a version
router.delete('/versions/:id/baseline', async (req, res) => {
const versionId = parseInt(req.params.id);
try {
const ctx = await getContext(versionId, 'baseline');
if (!guardOpen(ctx.version, res)) return;
const client = await pool.connect();
try {
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_iter = 'baseline' RETURNING pf_id`
);
const delLog = await client.query(
`DELETE FROM pf.log WHERE version_id = $1 AND operation = 'baseline'`,
[versionId]
);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
log_entries_deleted: delLog.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id)
});
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// load reference rows from source table (additive — does not clear prior reference rows)
router.post('/versions/:id/reference', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'reference');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// scale a slice — adjust value and/or units by absolute amount or percentage
router.post('/versions/:id/scale', async (req, res) => {
const { pf_user, note, slice, value_incr, units_incr, pct } = req.body;
if (!slice || Object.keys(slice).length === 0) {
return res.status(400).json({ error: 'slice is required' });
}
try {
const ctx = await getContext(parseInt(req.params.id), 'scale');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
let absValueIncr = value_incr || 0;
let absUnitsIncr = units_incr || 0;
// pct mode: run a quick totals query, convert percentages to absolutes
if (pct && (value_incr || units_incr)) {
const totals = await pool.query(`
SELECT
sum("${ctx.valueCol}") AS total_value,
sum("${ctx.unitsCol}") AS total_units
FROM ${ctx.table}
WHERE ${whereClause}
${excludeClause}
`);
const { total_value, total_units } = totals.rows[0];
if (value_incr) absValueIncr = (parseFloat(total_value) || 0) * value_incr / 100;
if (units_incr) absUnitsIncr = (parseFloat(total_units) || 0) * units_incr / 100;
}
if (absValueIncr === 0 && absUnitsIncr === 0) {
return res.status(400).json({ error: 'value_incr and/or units_incr must be non-zero' });
}
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, value_incr, units_incr, pct })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
value_incr: absValueIncr,
units_incr: absUnitsIncr
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'scale' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// recode dimension values on a slice
// inserts negative rows to zero out the original, positive rows with new dimension values
router.post('/versions/:id/recode', async (req, res) => {
const { pf_user, note, slice, set } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'recode');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'recode' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// clone a slice as new business under new dimension values
// does not offset the original slice
router.post('/versions/:id/clone', async (req, res) => {
const { pf_user, note, slice, set, scale } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'clone');
if (!guardOpen(ctx.version, res)) return;
const scaleFactor = (scale != null) ? parseFloat(scale) : 1.0;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set, scale: scaleFactor })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause,
scale_factor: scaleFactor
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'clone' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// list log entries for a version, newest first, with row counts
router.get('/versions/:id/log', async (req, res) => {
const versionId = parseInt(req.params.id);
try {
const verResult = await pool.query(
`SELECT v.*, s.tname, s.id AS source_id FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) return res.status(404).json({ error: 'Version not found' });
const { tname, source_id } = verResult.rows[0];
const table = fcTable(tname, versionId);
const colMeta = await pool.query(
`SELECT cname, role FROM pf.col_meta WHERE source_id = $1 AND role IN ('value', 'units')`,
[source_id]
);
const valueCol = colMeta.rows.find(c => c.role === 'value')?.cname;
const unitsCol = colMeta.rows.find(c => c.role === 'units')?.cname;
const aggCols = [
`count(f.pf_id)::int AS row_count`,
valueCol ? `sum(f."${valueCol}")::float8 AS value_total` : `NULL::float8 AS value_total`,
unitsCol ? `sum(f."${unitsCol}")::float8 AS units_total` : `NULL::float8 AS units_total`
].join(', ');
const result = await pool.query(`
SELECT l.*, ${aggCols},
$2::text AS value_col,
$3::text AS units_col
FROM pf.log l
LEFT JOIN ${table} f ON f.pf_logid = l.id
WHERE l.version_id = $1
GROUP BY l.id
ORDER BY l.id DESC
`, [versionId, valueCol || null, unitsCol || null]);
res.json(result.rows);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// undo a log entry — delete all fc rows with this logid, then delete the log entry
router.delete('/log/:logid', async (req, res) => {
const logId = parseInt(req.params.logid);
try {
const logResult = await pool.query(`
SELECT l.*, v.status, s.tname, v.id AS version_id
FROM pf.log l
JOIN pf.version v ON v.id = l.version_id
JOIN pf.source s ON s.id = v.source_id
WHERE l.id = $1
`, [logId]);
if (!logResult.rows.length) return res.status(404).json({ error: 'Log entry not found' });
const log = logResult.rows[0];
if (log.status === 'closed') return res.status(403).json({ error: 'Version is closed' });
const table = fcTable(log.tname, log.version_id);
const client = await pool.connect();
try {
await client.query('BEGIN');
const deleted = await client.query(
`DELETE FROM ${table} WHERE pf_logid = $1 RETURNING pf_id`, [logId]
);
await client.query('DELETE FROM pf.log WHERE id = $1', [logId]);
await client.query('COMMIT');
res.json({
rows_deleted: deleted.rowCount,
pf_ids: deleted.rows.map(r => r.pf_id)
});
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// update the note on a log entry
router.patch('/log/:logid', async (req, res) => {
const logId = parseInt(req.params.logid);
const { note } = req.body;
try {
const result = await pool.query(
`UPDATE pf.log SET note = $1 WHERE id = $2 RETURNING *`, [note, logId]
);
if (!result.rows.length) return res.status(404).json({ error: 'Log entry not found' });
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
return router;
};