pf_app/routes/operations.js
Paul Trowbridge 953ae2709f Reference offsets, edit visual cues, todo updates
Reference segments can now apply a date offset just like baselines.
SQL template gains the {{date_offset}} token; both POST /reference and
PUT baseline/:logid pass it through. Existing sources need to
regenerate SQL to pick up the new template — old stored reference SQL
ignores the token (preserving prior verbatim behavior). The Baseline
form drops the "dates land verbatim" hint and shows the offset
control for both segment types.

Editing a segment now color-codes the source row amber with a ring
and tints the form border + header amber so the active connection is
visually obvious. Header label reads "Edit segment #3 — baseline —
note" instead of just "#43" (the internal log id).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-29 11:08:49 -04:00

526 lines
22 KiB
JavaScript

const express = require('express');
const { tableFromJSON, tableToIPC } = require('apache-arrow');
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
const { fcTable } = require('../lib/utils');
module.exports = function(pool) {
const router = express.Router();
async function runSQL(sql) {
console.log('--- SQL ---\n', sql, '\n--- END SQL ---');
return pool.query(sql);
}
// fetch everything needed to execute an operation:
// version + source info, col_meta, fc_table name, stored SQL
async function getContext(versionId, operation) {
const verResult = await pool.query(`
SELECT v.*, s.schema, s.tname, s.id AS source_id
FROM pf.version v
JOIN pf.source s ON s.id = v.source_id
WHERE v.id = $1
`, [versionId]);
if (verResult.rows.length === 0) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const version = verResult.rows[0];
const colResult = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[version.source_id]
);
const colMeta = colResult.rows;
const dimCols = colMeta.filter(c => c.role === 'dimension').map(c => c.cname);
const valueCol = colMeta.find(c => c.role === 'value')?.cname;
const unitsCol = colMeta.find(c => c.role === 'units')?.cname;
const sqlResult = await pool.query(
`SELECT sql FROM pf.sql WHERE source_id = $1 AND operation = $2`,
[version.source_id, operation]
);
if (sqlResult.rows.length === 0) {
const err = new Error(`No generated SQL for operation "${operation}" — run generate-sql first`);
err.status = 400; throw err;
}
return {
version,
table: fcTable(version.tname, version.id),
colMeta,
dimCols,
valueCol,
unitsCol,
sql: sqlResult.rows[0].sql
};
}
function guardOpen(version, res) {
if (version.status === 'closed') {
res.status(403).json({ error: 'Version is closed' });
return false;
}
return true;
}
// stream all rows for a version as Arrow IPC (all iters including reference)
router.get('/versions/:id/data', async (req, res) => {
const versionId = parseInt(req.params.id);
let client, committed = false;
try {
const verResult = await pool.query(
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const tbl = fcTable(verResult.rows[0].tname, versionId);
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
const rowCount = parseInt(count);
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
res.setHeader('X-Row-Count', String(rowCount));
if (rowCount === 0) { res.end(); return; }
client = await pool.connect();
await client.query('BEGIN');
await client.query(`
DECLARE pf_cur CURSOR FOR
SELECT f.*, l.note AS pf_note, l.operation AS pf_op
FROM ${tbl} f
LEFT JOIN pf.log l ON l.id = f.pf_logid
`);
// Accumulate rows from the cursor, then emit a single Arrow record batch.
// Per-batch tableFromJSON() builds independent dictionaries, which forces the
// writer to emit dictionary REPLACEMENT messages between batches — Perspective's
// WASM Arrow reader crashes on those (memory access out of bounds).
const allRows = [];
while (true) {
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
if (!rows.length) break;
for (const r of rows) allRows.push(r);
}
await client.query('COMMIT');
committed = true;
const buf = tableToIPC(tableFromJSON(allRows), 'stream');
res.setHeader('Content-Length', String(buf.byteLength));
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
} catch (err) {
console.error(err);
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
else res.destroy();
} finally {
if (client) {
if (!committed) try { await client.query('ROLLBACK'); } catch {}
client.release();
}
}
});
// load baseline rows from source table — additive, no delete
router.post('/versions/:id/baseline', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'baseline');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// edit a baseline or reference segment in place — only allowed before any
// scale/recode/clone has been applied on this version, since those would
// have been calibrated against the old segment's totals.
router.put('/versions/:id/baseline/:logid', async (req, res) => {
const versionId = parseInt(req.params.id);
const logid = parseInt(req.params.logid);
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
const client = await pool.connect();
try {
const logResult = await client.query(
`SELECT * FROM pf.log WHERE id = $1 AND version_id = $2`,
[logid, versionId]
);
if (logResult.rows.length === 0) {
return res.status(404).json({ error: 'Log entry not found' });
}
const oldLog = logResult.rows[0];
if (!['baseline', 'reference'].includes(oldLog.operation)) {
return res.status(400).json({ error: 'Only baseline or reference segments can be edited' });
}
const opsResult = await client.query(
`SELECT COUNT(*)::int AS n FROM pf.log
WHERE version_id = $1 AND operation IN ('scale', 'recode', 'clone')`,
[versionId]
);
if (opsResult.rows[0].n > 0) {
return res.status(409).json({
error: 'Cannot edit segments after forecast operations have been applied. Undo the operations first.'
});
}
const ctx = await getContext(versionId, oldLog.operation);
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_logid = $1 RETURNING pf_id`,
[logid]
);
await client.query(`DELETE FROM pf.log WHERE id = $1`, [logid]);
const insResult = await client.query(sql);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id),
rows_affected: insResult.rows[0]?.rows_affected ?? 0
});
} catch (err) {
try { await client.query('ROLLBACK'); } catch {}
console.error(err);
res.status(err.status || 500).json({ error: err.message });
} finally {
client.release();
}
});
// delete all baseline rows and log entries for a version
router.delete('/versions/:id/baseline', async (req, res) => {
const versionId = parseInt(req.params.id);
try {
const ctx = await getContext(versionId, 'baseline');
if (!guardOpen(ctx.version, res)) return;
const client = await pool.connect();
try {
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_iter = 'baseline' RETURNING pf_id`
);
const delLog = await client.query(
`DELETE FROM pf.log WHERE version_id = $1 AND operation = 'baseline'`,
[versionId]
);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
log_entries_deleted: delLog.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id)
});
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// load reference rows from source table (additive — does not clear prior reference rows)
router.post('/versions/:id/reference', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'reference');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// scale a slice — adjust value and/or units by absolute amount or percentage
router.post('/versions/:id/scale', async (req, res) => {
const { pf_user, note, slice, value_incr, units_incr, pct } = req.body;
if (!slice || Object.keys(slice).length === 0) {
return res.status(400).json({ error: 'slice is required' });
}
try {
const ctx = await getContext(parseInt(req.params.id), 'scale');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
let absValueIncr = value_incr || 0;
let absUnitsIncr = units_incr || 0;
// pct mode: run a quick totals query, convert percentages to absolutes
if (pct && (value_incr || units_incr)) {
const totals = await pool.query(`
SELECT
sum("${ctx.valueCol}") AS total_value,
sum("${ctx.unitsCol}") AS total_units
FROM ${ctx.table}
WHERE ${whereClause}
${excludeClause}
`);
const { total_value, total_units } = totals.rows[0];
if (value_incr) absValueIncr = (parseFloat(total_value) || 0) * value_incr / 100;
if (units_incr) absUnitsIncr = (parseFloat(total_units) || 0) * units_incr / 100;
}
if (absValueIncr === 0 && absUnitsIncr === 0) {
return res.status(400).json({ error: 'value_incr and/or units_incr must be non-zero' });
}
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, value_incr, units_incr, pct })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
value_incr: absValueIncr,
units_incr: absUnitsIncr
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'scale' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// recode dimension values on a slice
// inserts negative rows to zero out the original, positive rows with new dimension values
router.post('/versions/:id/recode', async (req, res) => {
const { pf_user, note, slice, set } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'recode');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'recode' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// clone a slice as new business under new dimension values
// does not offset the original slice
router.post('/versions/:id/clone', async (req, res) => {
const { pf_user, note, slice, set, scale } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'clone');
if (!guardOpen(ctx.version, res)) return;
const scaleFactor = (scale != null) ? parseFloat(scale) : 1.0;
const whereClause = buildWhere(slice, ctx.dimCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set, scale: scaleFactor })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause,
scale_factor: scaleFactor
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'clone' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// list log entries for a version, newest first, with row counts
router.get('/versions/:id/log', async (req, res) => {
const versionId = parseInt(req.params.id);
try {
const verResult = await pool.query(
`SELECT v.*, s.tname, s.id AS source_id FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) return res.status(404).json({ error: 'Version not found' });
const { tname, source_id } = verResult.rows[0];
const table = fcTable(tname, versionId);
const colMeta = await pool.query(
`SELECT cname, role FROM pf.col_meta WHERE source_id = $1 AND role IN ('value', 'units')`,
[source_id]
);
const valueCol = colMeta.rows.find(c => c.role === 'value')?.cname;
const unitsCol = colMeta.rows.find(c => c.role === 'units')?.cname;
const aggCols = [
`count(f.pf_id)::int AS row_count`,
valueCol ? `sum(f."${valueCol}")::float8 AS value_total` : `NULL::float8 AS value_total`,
unitsCol ? `sum(f."${unitsCol}")::float8 AS units_total` : `NULL::float8 AS units_total`
].join(', ');
const result = await pool.query(`
SELECT l.*, ${aggCols},
$2::text AS value_col,
$3::text AS units_col
FROM pf.log l
LEFT JOIN ${table} f ON f.pf_logid = l.id
WHERE l.version_id = $1
GROUP BY l.id
ORDER BY l.id DESC
`, [versionId, valueCol || null, unitsCol || null]);
res.json(result.rows);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// undo a log entry — delete all fc rows with this logid, then delete the log entry
router.delete('/log/:logid', async (req, res) => {
const logId = parseInt(req.params.logid);
try {
const logResult = await pool.query(`
SELECT l.*, v.status, s.tname, v.id AS version_id
FROM pf.log l
JOIN pf.version v ON v.id = l.version_id
JOIN pf.source s ON s.id = v.source_id
WHERE l.id = $1
`, [logId]);
if (!logResult.rows.length) return res.status(404).json({ error: 'Log entry not found' });
const log = logResult.rows[0];
if (log.status === 'closed') return res.status(403).json({ error: 'Version is closed' });
const table = fcTable(log.tname, log.version_id);
const client = await pool.connect();
try {
await client.query('BEGIN');
const deleted = await client.query(
`DELETE FROM ${table} WHERE pf_logid = $1 RETURNING pf_id`, [logId]
);
await client.query('DELETE FROM pf.log WHERE id = $1', [logId]);
await client.query('COMMIT');
res.json({
rows_deleted: deleted.rowCount,
pf_ids: deleted.rows.map(r => r.pf_id)
});
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// update the note on a log entry
router.patch('/log/:logid', async (req, res) => {
const logId = parseInt(req.params.logid);
const { note } = req.body;
try {
const result = await pool.query(
`UPDATE pf.log SET note = $1 WHERE id = $2 RETURNING *`, [note, logId]
);
if (!result.rows.length) return res.status(404).json({ error: 'Log entry not found' });
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
return router;
};