The segment form is now one component rendered in either 'view' or 'edit' mode — the expanded segment row in the list and the add/edit form below share the same layout, view mode just disables the inputs. Edit and View are visually identical so toggling between them feels like enabling fields, not switching tools. Filters become groups (conditions AND-ed inside, groups OR-ed between) with + AND condition and + Add OR group affordances. The compiled WHERE renders live below the groups so you can see what's being built. A "Switch to manual SQL" toggle flips to a textarea seeded with the compiled clause; backend baseline POST/PUT and reference POST accept raw_where alongside filters and store whichever arrived in pf.log.params for round-tripping. The Add form is hidden until you click "+ Add segment" at the bottom of the segments table; Edit also opens it. Cancel/Close returns the table to its compact state. /versions/:id/log now also returns value_total, units_total, and the column names so the segments table can show row count and value sum inline (header uses the source's actual value column name). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
533 lines
23 KiB
JavaScript
533 lines
23 KiB
JavaScript
const express = require('express');
|
|
const { tableFromJSON, tableToIPC } = require('apache-arrow');
|
|
const { applyTokens, buildWhere, buildExcludeClause, buildSetClause, esc } = require('../lib/sql_generator');
|
|
const { fcTable } = require('../lib/utils');
|
|
|
|
module.exports = function(pool) {
|
|
const router = express.Router();
|
|
|
|
async function runSQL(sql) {
|
|
console.log('--- SQL ---\n', sql, '\n--- END SQL ---');
|
|
return pool.query(sql);
|
|
}
|
|
|
|
// fetch everything needed to execute an operation:
|
|
// version + source info, col_meta, fc_table name, stored SQL
|
|
async function getContext(versionId, operation) {
|
|
const verResult = await pool.query(`
|
|
SELECT v.*, s.schema, s.tname, s.id AS source_id
|
|
FROM pf.version v
|
|
JOIN pf.source s ON s.id = v.source_id
|
|
WHERE v.id = $1
|
|
`, [versionId]);
|
|
if (verResult.rows.length === 0) {
|
|
const err = new Error('Version not found'); err.status = 404; throw err;
|
|
}
|
|
const version = verResult.rows[0];
|
|
|
|
const colResult = await pool.query(
|
|
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
|
|
[version.source_id]
|
|
);
|
|
const colMeta = colResult.rows;
|
|
const dimCols = colMeta.filter(c => c.role === 'dimension').map(c => c.cname);
|
|
const valueCol = colMeta.find(c => c.role === 'value')?.cname;
|
|
const unitsCol = colMeta.find(c => c.role === 'units')?.cname;
|
|
|
|
const sqlResult = await pool.query(
|
|
`SELECT sql FROM pf.sql WHERE source_id = $1 AND operation = $2`,
|
|
[version.source_id, operation]
|
|
);
|
|
if (sqlResult.rows.length === 0) {
|
|
const err = new Error(`No generated SQL for operation "${operation}" — run generate-sql first`);
|
|
err.status = 400; throw err;
|
|
}
|
|
|
|
return {
|
|
version,
|
|
table: fcTable(version.tname, version.id),
|
|
colMeta,
|
|
dimCols,
|
|
valueCol,
|
|
unitsCol,
|
|
sql: sqlResult.rows[0].sql
|
|
};
|
|
}
|
|
|
|
function guardOpen(version, res) {
|
|
if (version.status === 'closed') {
|
|
res.status(403).json({ error: 'Version is closed' });
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// stream all rows for a version as Arrow IPC (all iters including reference)
|
|
router.get('/versions/:id/data', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
let client, committed = false;
|
|
try {
|
|
const verResult = await pool.query(
|
|
`SELECT v.*, s.tname FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
|
|
[versionId]
|
|
);
|
|
if (!verResult.rows.length) {
|
|
const err = new Error('Version not found'); err.status = 404; throw err;
|
|
}
|
|
const tbl = fcTable(verResult.rows[0].tname, versionId);
|
|
|
|
const { rows: [{ count }] } = await pool.query(`SELECT COUNT(*) FROM ${tbl}`);
|
|
const rowCount = parseInt(count);
|
|
|
|
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream');
|
|
res.setHeader('X-Row-Count', String(rowCount));
|
|
|
|
if (rowCount === 0) { res.end(); return; }
|
|
|
|
client = await pool.connect();
|
|
await client.query('BEGIN');
|
|
await client.query(`
|
|
DECLARE pf_cur CURSOR FOR
|
|
SELECT f.*, l.note AS pf_note, l.operation AS pf_op
|
|
FROM ${tbl} f
|
|
LEFT JOIN pf.log l ON l.id = f.pf_logid
|
|
`);
|
|
|
|
// Accumulate rows from the cursor, then emit a single Arrow record batch.
|
|
// Per-batch tableFromJSON() builds independent dictionaries, which forces the
|
|
// writer to emit dictionary REPLACEMENT messages between batches — Perspective's
|
|
// WASM Arrow reader crashes on those (memory access out of bounds).
|
|
const allRows = [];
|
|
while (true) {
|
|
const { rows } = await client.query('FETCH 10000 FROM pf_cur');
|
|
if (!rows.length) break;
|
|
for (const r of rows) allRows.push(r);
|
|
}
|
|
await client.query('COMMIT');
|
|
committed = true;
|
|
|
|
const buf = tableToIPC(tableFromJSON(allRows), 'stream');
|
|
res.setHeader('Content-Length', String(buf.byteLength));
|
|
res.end(Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength));
|
|
} catch (err) {
|
|
console.error(err);
|
|
if (!res.headersSent) res.status(err.status || 500).json({ error: err.message });
|
|
else res.destroy();
|
|
} finally {
|
|
if (client) {
|
|
if (!committed) try { await client.query('ROLLBACK'); } catch {}
|
|
client.release();
|
|
}
|
|
}
|
|
});
|
|
|
|
// load baseline rows from source table — additive, no delete
|
|
router.post('/versions/:id/baseline', async (req, res) => {
|
|
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
|
|
const dateOffset = date_offset || '0 days';
|
|
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'baseline');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
date_offset: dateOffset,
|
|
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
|
|
});
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause,
|
|
date_offset: esc(dateOffset)
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// edit a baseline or reference segment in place — only allowed before any
|
|
// scale/recode/clone has been applied on this version, since those would
|
|
// have been calibrated against the old segment's totals.
|
|
router.put('/versions/:id/baseline/:logid', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
const logid = parseInt(req.params.logid);
|
|
const { where_clause, date_offset, pf_user, note, filters, raw_where } = req.body;
|
|
const dateOffset = date_offset || '0 days';
|
|
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
|
|
|
|
const client = await pool.connect();
|
|
try {
|
|
const logResult = await client.query(
|
|
`SELECT * FROM pf.log WHERE id = $1 AND version_id = $2`,
|
|
[logid, versionId]
|
|
);
|
|
if (logResult.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Log entry not found' });
|
|
}
|
|
const oldLog = logResult.rows[0];
|
|
if (!['baseline', 'reference'].includes(oldLog.operation)) {
|
|
return res.status(400).json({ error: 'Only baseline or reference segments can be edited' });
|
|
}
|
|
|
|
const opsResult = await client.query(
|
|
`SELECT COUNT(*)::int AS n FROM pf.log
|
|
WHERE version_id = $1 AND operation IN ('scale', 'recode', 'clone')`,
|
|
[versionId]
|
|
);
|
|
if (opsResult.rows[0].n > 0) {
|
|
return res.status(409).json({
|
|
error: 'Cannot edit segments after forecast operations have been applied. Undo the operations first.'
|
|
});
|
|
}
|
|
|
|
const ctx = await getContext(versionId, oldLog.operation);
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
...(oldLog.operation === 'baseline' ? { date_offset: dateOffset } : {}),
|
|
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
|
|
});
|
|
const tokens = oldLog.operation === 'baseline'
|
|
? {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause,
|
|
date_offset: esc(dateOffset)
|
|
}
|
|
: {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause
|
|
};
|
|
const sql = applyTokens(ctx.sql, tokens);
|
|
|
|
await client.query('BEGIN');
|
|
const delRows = await client.query(
|
|
`DELETE FROM ${ctx.table} WHERE pf_logid = $1 RETURNING pf_id`,
|
|
[logid]
|
|
);
|
|
await client.query(`DELETE FROM pf.log WHERE id = $1`, [logid]);
|
|
const insResult = await client.query(sql);
|
|
await client.query('COMMIT');
|
|
|
|
res.json({
|
|
rows_deleted: delRows.rowCount,
|
|
pf_ids: delRows.rows.map(r => r.pf_id),
|
|
rows_affected: insResult.rows[0]?.rows_affected ?? 0
|
|
});
|
|
} catch (err) {
|
|
try { await client.query('ROLLBACK'); } catch {}
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
} finally {
|
|
client.release();
|
|
}
|
|
});
|
|
|
|
// delete all baseline rows and log entries for a version
|
|
router.delete('/versions/:id/baseline', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
try {
|
|
const ctx = await getContext(versionId, 'baseline');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
const delRows = await client.query(
|
|
`DELETE FROM ${ctx.table} WHERE pf_iter = 'baseline' RETURNING pf_id`
|
|
);
|
|
const delLog = await client.query(
|
|
`DELETE FROM pf.log WHERE version_id = $1 AND operation = 'baseline'`,
|
|
[versionId]
|
|
);
|
|
await client.query('COMMIT');
|
|
res.json({
|
|
rows_deleted: delRows.rowCount,
|
|
log_entries_deleted: delLog.rowCount,
|
|
pf_ids: delRows.rows.map(r => r.pf_id)
|
|
});
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
throw err;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// load reference rows from source table (additive — does not clear prior reference rows)
|
|
router.post('/versions/:id/reference', async (req, res) => {
|
|
const { where_clause, pf_user, note, filters, raw_where } = req.body;
|
|
const filterClause = (raw_where || where_clause || '').trim() || 'TRUE';
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'reference');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
const paramsJson = JSON.stringify({
|
|
where_clause: filterClause,
|
|
...(raw_where ? { raw_where } : (filters ? { filters } : {}))
|
|
});
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(paramsJson),
|
|
filter_clause: filterClause
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// scale a slice — adjust value and/or units by absolute amount or percentage
|
|
router.post('/versions/:id/scale', async (req, res) => {
|
|
const { pf_user, note, slice, value_incr, units_incr, pct } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) {
|
|
return res.status(400).json({ error: 'slice is required' });
|
|
}
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'scale');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
|
|
let absValueIncr = value_incr || 0;
|
|
let absUnitsIncr = units_incr || 0;
|
|
|
|
// pct mode: run a quick totals query, convert percentages to absolutes
|
|
if (pct && (value_incr || units_incr)) {
|
|
const totals = await pool.query(`
|
|
SELECT
|
|
sum("${ctx.valueCol}") AS total_value,
|
|
sum("${ctx.unitsCol}") AS total_units
|
|
FROM ${ctx.table}
|
|
WHERE ${whereClause}
|
|
${excludeClause}
|
|
`);
|
|
const { total_value, total_units } = totals.rows[0];
|
|
if (value_incr) absValueIncr = (parseFloat(total_value) || 0) * value_incr / 100;
|
|
if (units_incr) absUnitsIncr = (parseFloat(total_units) || 0) * units_incr / 100;
|
|
}
|
|
|
|
if (absValueIncr === 0 && absUnitsIncr === 0) {
|
|
return res.status(400).json({ error: 'value_incr and/or units_incr must be non-zero' });
|
|
}
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, value_incr, units_incr, pct })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
value_incr: absValueIncr,
|
|
units_incr: absUnitsIncr
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'scale' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// recode dimension values on a slice
|
|
// inserts negative rows to zero out the original, positive rows with new dimension values
|
|
router.post('/versions/:id/recode', async (req, res) => {
|
|
const { pf_user, note, slice, set } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
|
|
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
|
|
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'recode');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
const setClause = buildSetClause(ctx.dimCols, set);
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, set })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
set_clause: setClause
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'recode' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// clone a slice as new business under new dimension values
|
|
// does not offset the original slice
|
|
router.post('/versions/:id/clone', async (req, res) => {
|
|
const { pf_user, note, slice, set, scale } = req.body;
|
|
if (!slice || Object.keys(slice).length === 0) return res.status(400).json({ error: 'slice is required' });
|
|
if (!set || Object.keys(set).length === 0) return res.status(400).json({ error: 'set is required' });
|
|
|
|
try {
|
|
const ctx = await getContext(parseInt(req.params.id), 'clone');
|
|
if (!guardOpen(ctx.version, res)) return;
|
|
|
|
const scaleFactor = (scale != null) ? parseFloat(scale) : 1.0;
|
|
const whereClause = buildWhere(slice, ctx.dimCols);
|
|
const excludeClause = buildExcludeClause(ctx.version.exclude_iters);
|
|
const setClause = buildSetClause(ctx.dimCols, set);
|
|
|
|
const sql = applyTokens(ctx.sql, {
|
|
fc_table: ctx.table,
|
|
version_id: ctx.version.id,
|
|
pf_user: esc(pf_user || ''),
|
|
note: esc(note || ''),
|
|
params: esc(JSON.stringify({ slice, set, scale: scaleFactor })),
|
|
slice: esc(JSON.stringify(slice)),
|
|
where_clause: whereClause,
|
|
exclude_clause: excludeClause,
|
|
set_clause: setClause,
|
|
scale_factor: scaleFactor
|
|
});
|
|
|
|
const result = await runSQL(sql);
|
|
const rows = result.rows.map(r => ({ ...r, pf_note: note || null, pf_op: 'clone' }));
|
|
res.json({ rows, rows_affected: rows.length });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// list log entries for a version, newest first, with row counts
|
|
router.get('/versions/:id/log', async (req, res) => {
|
|
const versionId = parseInt(req.params.id);
|
|
try {
|
|
const verResult = await pool.query(
|
|
`SELECT v.*, s.tname, s.id AS source_id FROM pf.version v JOIN pf.source s ON s.id = v.source_id WHERE v.id = $1`,
|
|
[versionId]
|
|
);
|
|
if (!verResult.rows.length) return res.status(404).json({ error: 'Version not found' });
|
|
const { tname, source_id } = verResult.rows[0];
|
|
const table = fcTable(tname, versionId);
|
|
|
|
const colMeta = await pool.query(
|
|
`SELECT cname, role FROM pf.col_meta WHERE source_id = $1 AND role IN ('value', 'units')`,
|
|
[source_id]
|
|
);
|
|
const valueCol = colMeta.rows.find(c => c.role === 'value')?.cname;
|
|
const unitsCol = colMeta.rows.find(c => c.role === 'units')?.cname;
|
|
|
|
const aggCols = [
|
|
`count(f.pf_id)::int AS row_count`,
|
|
valueCol ? `sum(f."${valueCol}")::float8 AS value_total` : `NULL::float8 AS value_total`,
|
|
unitsCol ? `sum(f."${unitsCol}")::float8 AS units_total` : `NULL::float8 AS units_total`
|
|
].join(', ');
|
|
|
|
const result = await pool.query(`
|
|
SELECT l.*, ${aggCols},
|
|
$2::text AS value_col,
|
|
$3::text AS units_col
|
|
FROM pf.log l
|
|
LEFT JOIN ${table} f ON f.pf_logid = l.id
|
|
WHERE l.version_id = $1
|
|
GROUP BY l.id
|
|
ORDER BY l.id DESC
|
|
`, [versionId, valueCol || null, unitsCol || null]);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// undo a log entry — delete all fc rows with this logid, then delete the log entry
|
|
router.delete('/log/:logid', async (req, res) => {
|
|
const logId = parseInt(req.params.logid);
|
|
try {
|
|
const logResult = await pool.query(`
|
|
SELECT l.*, v.status, s.tname, v.id AS version_id
|
|
FROM pf.log l
|
|
JOIN pf.version v ON v.id = l.version_id
|
|
JOIN pf.source s ON s.id = v.source_id
|
|
WHERE l.id = $1
|
|
`, [logId]);
|
|
if (!logResult.rows.length) return res.status(404).json({ error: 'Log entry not found' });
|
|
const log = logResult.rows[0];
|
|
if (log.status === 'closed') return res.status(403).json({ error: 'Version is closed' });
|
|
const table = fcTable(log.tname, log.version_id);
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
const deleted = await client.query(
|
|
`DELETE FROM ${table} WHERE pf_logid = $1 RETURNING pf_id`, [logId]
|
|
);
|
|
await client.query('DELETE FROM pf.log WHERE id = $1', [logId]);
|
|
await client.query('COMMIT');
|
|
res.json({
|
|
rows_deleted: deleted.rowCount,
|
|
pf_ids: deleted.rows.map(r => r.pf_id)
|
|
});
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
throw err;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// update the note on a log entry
|
|
router.patch('/log/:logid', async (req, res) => {
|
|
const logId = parseInt(req.params.logid);
|
|
const { note } = req.body;
|
|
try {
|
|
const result = await pool.query(
|
|
`UPDATE pf.log SET note = $1 WHERE id = $2 RETURNING *`, [note, logId]
|
|
);
|
|
if (!result.rows.length) return res.status(404).json({ error: 'Log entry not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(err.status || 500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
return router;
|
|
};
|