Adds PUT /versions/:id/baseline/:logid that, in one transaction, drops the segment's rows and log entry and replays the baseline or reference SQL with new params. The endpoint refuses (409) if any scale, recode, or clone has been applied — those operations were calibrated against the old totals and would silently misreconcile. Baseline view gets an Edit button on each segment (hidden once forecast operations exist), populating the form with the original filters, offset, and note. Submit issues PUT in edit mode, POST otherwise. POST baseline and POST reference now also persist the structured filters in pf.log.params so edit can reload them. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
516 lines
22 KiB
JavaScript
516 lines
22 KiB
JavaScript
const express = require('express');
|
|
const { tableFromJSON, tableToIPC } = require('apache-arrow');
|
|
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
|
|
const { fcTable } = require('../lib/utils');
|
|
|
|
module.exports = function(pool) {
|
|
const router = express.Router();
|
|
|
|
async function runSQL(sql) {
|
|
console.log('--- SQL ---\n', sql, '\n--- END SQL ---');
|
|
return pool.query(sql);
|
|
}
|
|
|
|
// fetch everything needed to execute an operation:
|
|
// version + source info, col_meta, fc_table name, stored SQL
|
|
async function getContext(versionId, operation) {
|
|
const verResult = await pool.query(`
|
|
SELECT v.*, s.schema, s.tname, s.id AS source_id
|
|
FROM pf.version v
|
|
JOIN pf.source s ON s.id = v.source_id
|
|
WHERE v.id = $1
|
|
`, [versionId]);
|
|
if (verResult.rows.length === 0) {
|
|
const err = new Error('Version not found'); err.status = 404; throw err;
|
|
}
|
|
const version = verResult.rows[0];
|
|
|
|
const colResult = await pool.query(
|
|
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
|
|
[version.source_id]
|
|
);
|
|
const colMeta = colResult.rows;
|
|
const dimCols = colMeta.filter(c => c.role === 'dimension').map(c => c.cname);
|
|
const valueCol = colMeta.find(c => c.role === 'value')?.cname;
|
|
const unitsCol = colMeta.find(c => c.role === 'units')?.cname;
|
|
|
|
const sqlResult = await pool.query(
|
|
`SELECT sql FROM pf.sql WHERE source_id = $1 AND operation = $2`,
|
|
[version.source_id, operation]
|
|
);
|
|
if (sqlResult.rows.length === 0) {
|
|
const err = new Error(`No generated SQL for operation "${operation}" — run generate-sql first`);
|
|
err.status = 400; throw err;
|
|
}
|
|
|
|
return {
|
|
version,
|
|
table: fcTable(version.tname, version.id),
|
|
colMeta,
|
|
dimCols,
|
|
valueCol,
|
|
unitsCol,
|
|
sql: sqlResult.rows[0].sql
|
|
};
|
|
}
|
|
|
|
function guardOpen(version, res) {
|
|
if (version.status === 'closed') {
|
|
res.status(403).json({ error: 'Version is closed' });
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// stream all rows for a version as Arrow IPC (all iters including reference)
|
|
router.get('/versions/:id/data', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
let client, committed = false;
|
|
try {
|
|
const verResult = await pool.query(
|
|
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
|
|
[versionId]
|
|
);
|
|
if (!verResult.rows.length) {
|
|
const err = new Error('Version not found'); err.status = 404; throw err;
|
|
}
|
|
const tbl = fcTable(verResult.rows[0].tname, versionId);
|
|
|
|
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
|
|
const rowCount = parseInt(count);
|
|
|
|
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
|
|
res.setHeader('X-Row-Count', String(rowCount));
|
|
|
|
if (rowCount === 0) { res.end(); return; }
|
|
|
|
client = await pool.connect();
|
|
await client.query('BEGIN');
|
|
await client.query(`
|
|
DECLARE pf_cur CURSOR FOR
|
|
SELECT f.*, l.note AS pf_note, l.operation AS pf_op
|
|
FROM ${tbl} f
|
|
LEFT JOIN pf.log l ON l.id = f.pf_logid
|
|
`);
|
|
|
|
// Accumulate rows from the cursor, then emit a single Arrow record batch.
|
|
// Per-batch tableFromJSON() builds independent dictionaries, which forces the
|
|
// writer to emit dictionary REPLACEMENT messages between batches — Perspective's
|
|
// WASM Arrow reader crashes on those (memory access out of bounds).
|
|
const allRows = [];
|
|
while (true) {
|
|
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
|
|
if (!rows.length) break;
|
|
for (const r of rows) allRows.push(r);
|
|
}
|
|
await client.query('COMMIT');
|
|
committed = true;
|
|
|
|
const buf = tableToIPC(tableFromJSON(allRows), 'stream');
|
|
res.setHeader('Content-Length', String(buf.byteLength));
|
|
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
|
|
} catch (err) {
|
|
console.error(err);
|
|
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
|
|
else res.destroy();
|
|
} finally {
|
|
if (client) {
|
|
if (!committed) try { await client.query('ROLLBACK'); } catch {}
|
|
client.release();
|
|
}
|
|
}
|
|
});
|
|
|
|
// load baseline rows from source table — additive, no delete
|
|
router.post('/versions/:id/baseline', async (req, res) => {
|
|
const { where_clause, date_offset, pf_user, note, filters } = req.body;
|
|
const dateOffset = date_offset || '0 days';
|
|
const filterClause = (where_clause || '').trim() || 'TRUE';
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'baseline');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
date_offset: dateOffset,
|
|
...(filters ? { filters } : {})
|
|
});
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause,
|
|
date_offset: esc(dateOffset)
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// edit a baseline or reference segment in place — only allowed before any
|
|
// scale/recode/clone has been applied on this version, since those would
|
|
// have been calibrated against the old segment's totals.
|
|
router.put('/versions/:id/baseline/:logid', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
const logid = parseInt(req.params.logid);
|
|
const { where_clause, date_offset, pf_user, note, filters } = req.body;
|
|
const dateOffset = date_offset || '0 days';
|
|
const filterClause = (where_clause || '').trim() || 'TRUE';
|
|
|
|
const client = await pool.connect();
|
|
try {
|
|
const logResult = await client.query(
|
|
`SELECT * FROM pf.log WHERE id = $1 AND version_id = $2`,
|
|
[logid, versionId]
|
|
);
|
|
if (logResult.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Log entry not found' });
|
|
}
|
|
const oldLog = logResult.rows[0];
|
|
if (!['baseline', 'reference'].includes(oldLog.operation)) {
|
|
return res.status(400).json({ error: 'Only baseline or reference segments can be edited' });
|
|
}
|
|
|
|
const opsResult = await client.query(
|
|
`SELECT COUNT(*)::int AS n FROM pf.log
|
|
WHERE version_id = $1 AND operation IN ('scale', 'recode', 'clone')`,
|
|
[versionId]
|
|
);
|
|
if (opsResult.rows[0].n > 0) {
|
|
return res.status(409).json({
|
|
error: 'Cannot edit segments after forecast operations have been applied. Undo the operations first.'
|
|
});
|
|
}
|
|
|
|
const ctx = await getContext(versionId, oldLog.operation);
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
...(oldLog.operation === 'baseline' ? { date_offset: dateOffset } : {}),
|
|
...(filters ? { filters } : {})
|
|
});
|
|
const tokens = oldLog.operation === 'baseline'
|
|
? {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause,
|
|
date_offset: esc(dateOffset)
|
|
}
|
|
: {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause
|
|
};
|
|
const sql = applyTokens(ctx.sql, tokens);
|
|
|
|
await client.query('BEGIN');
|
|
const delRows = await client.query(
|
|
`DELETE FROM ${ctx.table} WHERE pf_logid = $1 RETURNING pf_id`,
|
|
[logid]
|
|
);
|
|
await client.query(`DELETE FROM pf.log WHERE id = $1`, [logid]);
|
|
const insResult = await client.query(sql);
|
|
await client.query('COMMIT');
|
|
|
|
res.json({
|
|
rows_deleted: delRows.rowCount,
|
|
pf_ids: delRows.rows.map(r => r.pf_id),
|
|
rows_affected: insResult.rows[0]?.rows_affected ?? 0
|
|
});
|
|
} catch (err) {
|
|
try { await client.query('ROLLBACK'); } catch {}
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
} finally {
|
|
client.release();
|
|
}
|
|
});
|
|
|
|
// delete all baseline rows and log entries for a version
|
|
router.delete('/versions/:id/baseline', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
try {
|
|
const ctx = await getContext(versionId, 'baseline');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
const delRows = await client.query(
|
|
`DELETE FROM ${ctx.table} WHERE pf_iter = 'baseline' RETURNING pf_id`
|
|
);
|
|
const delLog = await client.query(
|
|
`DELETE FROM pf.log WHERE version_id = $1 AND operation = 'baseline'`,
|
|
[versionId]
|
|
);
|
|
await client.query('COMMIT');
|
|
res.json({
|
|
rows_deleted: delRows.rowCount,
|
|
log_entries_deleted: delLog.rowCount,
|
|
pf_ids: delRows.rows.map(r => r.pf_id)
|
|
});
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
throw err;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// load reference rows from source table (additive — does not clear prior reference rows)
|
|
router.post('/versions/:id/reference', async (req, res) => {
|
|
const { where_clause, pf_user, note, filters } = req.body;
|
|
const filterClause = (where_clause || '').trim() || 'TRUE';
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'reference');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
...(filters ? { filters } : {})
|
|
});
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// scale a slice — adjust value and/or units by absolute amount or percentage
|
|
router.post('/versions/:id/scale', async (req, res) => {
|
|
const { pf_user, note, slice, value_incr, units_incr, pct } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) {
|
|
return res.status(400).json({ error: 'slice is required' });
|
|
}
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'scale');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
|
|
let absValueIncr = value_incr || 0;
|
|
let absUnitsIncr = units_incr || 0;
|
|
|
|
// pct mode: run a quick totals query, convert percentages to absolutes
|
|
if (pct && (value_incr || units_incr)) {
|
|
const totals = await pool.query(`
|
|
SELECT
|
|
sum("${ctx.valueCol}") AS total_value,
|
|
sum("${ctx.unitsCol}") AS total_units
|
|
FROM ${ctx.table}
|
|
WHERE ${whereClause}
|
|
${excludeClause}
|
|
`);
|
|
const { total_value, total_units } = totals.rows[0];
|
|
if (value_incr) absValueIncr = (parseFloat(total_value) || 0) * value_incr / 100;
|
|
if (units_incr) absUnitsIncr = (parseFloat(total_units) || 0) * units_incr / 100;
|
|
}
|
|
|
|
if (absValueIncr === 0 && absUnitsIncr === 0) {
|
|
return res.status(400).json({ error: 'value_incr and/or units_incr must be non-zero' });
|
|
}
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, value_incr, units_incr, pct })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
value_incr: absValueIncr,
|
|
units_incr: absUnitsIncr
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'scale' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// recode dimension values on a slice
|
|
// inserts negative rows to zero out the original, positive rows with new dimension values
|
|
router.post('/versions/:id/recode', async (req, res) => {
|
|
const { pf_user, note, slice, set } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
|
|
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
|
|
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'recode');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
const setClause = buildSetClause(ctx.dimCols, set);
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, set })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
set_clause: setClause
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'recode' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// clone a slice as new business under new dimension values
|
|
// does not offset the original slice
|
|
router.post('/versions/:id/clone', async (req, res) => {
|
|
const { pf_user, note, slice, set, scale } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
|
|
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
|
|
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'clone');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const scaleFactor = (scale != null) ? parseFloat(scale) : 1.0;
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
const setClause = buildSetClause(ctx.dimCols, set);
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, set, scale: scaleFactor })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
set_clause: setClause,
|
|
scale_factor: scaleFactor
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'clone' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// list log entries for a version, newest first, with row counts
|
|
router.get('/versions/:id/log', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
try {
|
|
const verResult = await pool.query(
|
|
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
|
|
[versionId]
|
|
);
|
|
if (!verResult.rows.length) return res.status(404).json({ error: 'Version not found' });
|
|
const table = fcTable(verResult.rows[0].tname, versionId);
|
|
const result = await pool.query(`
|
|
SELECT l.*, count(f.pf_id)::int AS row_count
|
|
FROM pf.log l
|
|
LEFT JOIN ${table} f ON f.pf_logid = l.id
|
|
WHERE l.version_id = $1
|
|
GROUP BY l.id
|
|
ORDER BY l.id DESC
|
|
`, [versionId]);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// undo a log entry — delete all fc rows with this logid, then delete the log entry
|
|
router.delete('/log/:logid', async (req, res) => {
|
|
const logId = parseInt(req.params.logid);
|
|
try {
|
|
const logResult = await pool.query(`
|
|
SELECT l.*, v.status, s.tname, v.id AS version_id
|
|
FROM pf.log l
|
|
JOIN pf.version v ON v.id = l.version_id
|
|
JOIN pf.source s ON s.id = v.source_id
|
|
WHERE l.id = $1
|
|
`, [logId]);
|
|
if (!logResult.rows.length) return res.status(404).json({ error: 'Log entry not found' });
|
|
const log = logResult.rows[0];
|
|
if (log.status === 'closed') return res.status(403).json({ error: 'Version is closed' });
|
|
const table = fcTable(log.tname, log.version_id);
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
const deleted = await client.query(
|
|
`DELETE FROM ${table} WHERE pf_logid = $1 RETURNING pf_id`, [logId]
|
|
);
|
|
await client.query('DELETE FROM pf.log WHERE id = $1', [logId]);
|
|
await client.query('COMMIT');
|
|
res.json({
|
|
rows_deleted: deleted.rowCount,
|
|
pf_ids: deleted.rows.map(r => r.pf_id)
|
|
});
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
throw err;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// update the note on a log entry
|
|
router.patch('/log/:logid', async (req, res) => {
|
|
const logId = parseInt(req.params.logid);
|
|
const { note } = req.body;
|
|
try {
|
|
const result = await pool.query(
|
|
`UPDATE pf.log SET note = $1 WHERE id = $2 RETURNING *`, [note, logId]
|
|
);
|
|
if (!result.rows.length) return res.status(404).json({ error: 'Log entry not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
return router;
|
|
};
|