pf_app/routes/operations.js
Paul Trowbridge 8d26629f32 Consolidate duplicate log routes into routes/log.js
GET /versions/:id/log, DELETE /log/:logid, and PATCH /log/:logid were
defined in both routes/operations.js and routes/log.js. operations.js is
registered first, so its handlers shadowed log.js entirely (dead code).

Move the authoritative implementations (value/units totals in GET,
closed-version 403 guard in DELETE) into log.js and remove the duplicates
from operations.js, keeping operations.js focused on the forecast ops.
No behavior change — the served handlers were already the operations.js
versions; they are now defined once.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 21:50:21 -04:00

438 lines
18 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const express = require('express');
const { tableFromArrays, tableToIPC } = require('apache-arrow');
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
const { fcTable } = require('../lib/utils');
module.exports = function(pool) {
const router = express.Router();
async function runSQL(sql) {
console.log('--- SQL ---\n', sql, '\n--- END SQL ---');
return pool.query(sql);
}
// fetch everything needed to execute an operation:
// version + source info, col_meta, fc_table name, stored SQL
async function getContext(versionId, operation) {
const verResult = await pool.query(`
SELECT v.*, s.schema, s.tname, s.id AS source_id
FROM pf.version v
JOIN pf.source s ON s.id = v.source_id
WHERE v.id = $1
`, [versionId]);
if (verResult.rows.length === 0) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const version = verResult.rows[0];
const colResult = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[version.source_id]
);
const colMeta = colResult.rows;
const dimCols = colMeta.filter(c => c.role === 'dimension').map(c => c.cname);
const dateCols = colMeta.filter(c => c.role === 'date').map(c => c.cname);
const valueCol = colMeta.find(c => c.role === 'value')?.cname;
const unitsCol = colMeta.find(c => c.role === 'units')?.cname;
const sqlResult = await pool.query(
`SELECT sql FROM pf.sql WHERE source_id = $1 AND operation = $2`,
[version.source_id, operation]
);
if (sqlResult.rows.length === 0) {
const err = new Error(`No generated SQL for operation "${operation}" — run generate-sql first`);
err.status = 400; throw err;
}
return {
version,
table: fcTable(version.tname, version.id),
colMeta,
dimCols,
dateCols,
filterCols: [...dimCols, ...dateCols],
valueCol,
unitsCol,
sql: sqlResult.rows[0].sql
};
}
function guardOpen(version, res) {
if (version.status === 'closed') {
res.status(403).json({ error: 'Version is closed' });
return false;
}
return true;
}
// stream all rows for a version as Arrow IPC (all iters including reference)
router.get('/versions/:id/data', async (req, res) => {
const versionId = parseInt(req.params.id);
let client, committed = false;
try {
const verResult = await pool.query(
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
[versionId]
);
if (!verResult.rows.length) {
const err = new Error('Version not found'); err.status = 404; throw err;
}
const tbl = fcTable(verResult.rows[0].tname, versionId);
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
const rowCount = parseInt(count);
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
res.setHeader('X-Row-Count', String(rowCount));
if (rowCount === 0) { res.end(); return; }
client = await pool.connect();
await client.query('BEGIN');
await client.query(`
DECLARE pf_cur CURSOR FOR
SELECT * FROM ${tbl}
`);
// Accumulate into column arrays (not row objects) to avoid allocating one JS
// object per row — cuts peak heap by ~3-5× for large datasets.
// Still emits a single Arrow record batch so Perspective WASM never sees
// dictionary REPLACEMENT messages (which crash its Arrow reader).
let colArrays = null;
while (true) {
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
if (!rows.length) break;
if (!colArrays) {
colArrays = Object.fromEntries(Object.keys(rows[0]).map(k => [k, []]));
}
for (const row of rows) {
for (const k of Object.keys(colArrays)) colArrays[k].push(row[k]);
}
}
await client.query('COMMIT');
committed = true;
const buf = tableToIPC(tableFromArrays(colArrays || {}), 'stream');
res.setHeader('Content-Length', String(buf.byteLength));
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
} catch (err) {
console.error(err);
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
else res.destroy();
} finally {
if (client) {
if (!committed) try { await client.query('ROLLBACK'); } catch {}
client.release();
}
}
});
// load baseline rows from source table — additive, no delete
router.post('/versions/:id/baseline', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'baseline');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// edit a baseline or reference segment in place — only allowed before any
// scale/recode/clone has been applied on this version, since those would
// have been calibrated against the old segment's totals.
router.put('/versions/:id/baseline/:logid', async (req, res) => {
const versionId = parseInt(req.params.id);
const logid = parseInt(req.params.logid);
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
const client = await pool.connect();
try {
const logResult = await client.query(
`SELECT * FROM pf.log WHERE id = $1 AND version_id = $2`,
[logid, versionId]
);
if (logResult.rows.length === 0) {
return res.status(404).json({ error: 'Log entry not found' });
}
const oldLog = logResult.rows[0];
if (!['baseline', 'reference'].includes(oldLog.operation)) {
return res.status(400).json({ error: 'Only baseline or reference segments can be edited' });
}
const opsResult = await client.query(
`SELECT COUNT(*)::int AS n FROM pf.log
WHERE version_id = $1 AND operation IN ('scale', 'recode', 'clone')`,
[versionId]
);
if (opsResult.rows[0].n > 0) {
return res.status(409).json({
error: 'Cannot edit segments after forecast operations have been applied. Undo the operations first.'
});
}
const ctx = await getContext(versionId, oldLog.operation);
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_logid = $1 RETURNING pf_id`,
[logid]
);
await client.query(`DELETE FROM pf.log WHERE id = $1`, [logid]);
const insResult = await client.query(sql);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id),
rows_affected: insResult.rows[0]?.rows_affected ?? 0
});
} catch (err) {
try { await client.query('ROLLBACK'); } catch {}
console.error(err);
res.status(err.status || 500).json({ error: err.message });
} finally {
client.release();
}
});
// delete all baseline rows and log entries for a version
router.delete('/versions/:id/baseline', async (req, res) => {
const versionId = parseInt(req.params.id);
try {
const ctx = await getContext(versionId, 'baseline');
if (!guardOpen(ctx.version, res)) return;
const client = await pool.connect();
try {
await client.query('BEGIN');
const delRows = await client.query(
`DELETE FROM ${ctx.table} WHERE pf_iter = 'baseline' RETURNING pf_id`
);
const delLog = await client.query(
`DELETE FROM pf.log WHERE version_id = $1 AND operation = 'baseline'`,
[versionId]
);
await client.query('COMMIT');
res.json({
rows_deleted: delRows.rowCount,
log_entries_deleted: delLog.rowCount,
pf_ids: delRows.rows.map(r => r.pf_id)
});
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// load reference rows from source table (additive — does not clear prior reference rows)
router.post('/versions/:id/reference', async (req, res) => {
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
const dateOffset = date_offset || '0 days';
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
try {
const ctx = await getContext(parseInt(req.params.id), 'reference');
if (!guardOpen(ctx.version, res)) return;
const paramsJson = JSON.stringify({
where_clause: filterClause,
date_offset: dateOffset,
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
});
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(paramsJson),
filter_clause: filterClause,
date_offset: esc(dateOffset)
});
const result = await runSQL(sql);
res.json(result.rows[0]);
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// scale a slice — adjust value and/or units by absolute amount or percentage
router.post('/versions/:id/scale', async (req, res) => {
const { pf_user, note, slice, value_incr, units_incr, pct } = req.body;
if (!slice || Object.keys(slice).length === 0) {
return res.status(400).json({ error: 'slice is required' });
}
try {
const ctx = await getContext(parseInt(req.params.id), 'scale');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.filterCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
let absValueIncr = value_incr || 0;
let absUnitsIncr = units_incr || 0;
// pct mode: run a quick totals query, convert percentages to absolutes
if (pct && (value_incr || units_incr)) {
const totals = await pool.query(`
SELECT
sum("${ctx.valueCol}") AS total_value,
sum("${ctx.unitsCol}") AS total_units
FROM ${ctx.table}
WHERE ${whereClause}
${excludeClause}
`);
const { total_value, total_units } = totals.rows[0];
if (value_incr) absValueIncr = (parseFloat(total_value) || 0) * value_incr / 100;
if (units_incr) absUnitsIncr = (parseFloat(total_units) || 0) * units_incr / 100;
}
if (absValueIncr === 0 && absUnitsIncr === 0) {
return res.status(400).json({ error: 'value_incr and/or units_incr must be non-zero' });
}
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, value_incr, units_incr, pct })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
value_incr: absValueIncr,
units_incr: absUnitsIncr
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'scale' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// recode dimension values on a slice
// inserts negative rows to zero out the original, positive rows with new dimension values
router.post('/versions/:id/recode', async (req, res) => {
const { pf_user, note, slice, set } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'recode');
if (!guardOpen(ctx.version, res)) return;
const whereClause = buildWhere(slice, ctx.filterCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'recode' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// clone a slice as new business under new dimension values
// does not offset the original slice
router.post('/versions/:id/clone', async (req, res) => {
const { pf_user, note, slice, set, scale } = req.body;
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
try {
const ctx = await getContext(parseInt(req.params.id), 'clone');
if (!guardOpen(ctx.version, res)) return;
const scaleFactor = (scale != null) ? parseFloat(scale) : 1.0;
const whereClause = buildWhere(slice, ctx.filterCols);
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
const setClause = buildSetClause(ctx.dimCols, set);
const sql = applyTokens(ctx.sql, {
fc_table: ctx.table,
version_id: ctx.version.id,
pf_user: esc(pf_user || ''),
note: esc(note || ''),
params: esc(JSON.stringify({ slice, set, scale: scaleFactor })),
slice: esc(JSON.stringify(slice)),
where_clause: whereClause,
exclude_clause: excludeClause,
set_clause: setClause,
scale_factor: scaleFactor
});
const result = await runSQL(sql);
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'clone' }));
res.json({ rows, rows_affected: rows.length });
} catch (err) {
console.error(err);
res.status(err.status || 500).json({ error: err.message });
}
});
// log routes (GET /versions/:id/log, DELETE /log/:logid, PATCH /log/:logid)
// live in routes/log.js — see that file.
return router;
};