pf_app/routes/sources.js
Paul Trowbridge 08dc415bfd Initial commit — pivot forecast application
Node.js/Express + PostgreSQL forecasting app with AG Grid Enterprise pivot UI.
Supports baseline, scale, recode, clone operations on configurable source tables.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 07:59:05 -04:00

244 lines
9.2 KiB
JavaScript

const express = require('express');
const { generateSQL } = require('../lib/sql_generator');
module.exports = function(pool) {
const router = express.Router();
// list all registered sources
router.get('/sources', async (req, res) => {
try {
const result = await pool.query(
`SELECT * FROM pf.source ORDER BY schema, tname`
);
res.json(result.rows);
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
// register a source table
// auto-populates col_meta from information_schema with role='ignore'
router.post('/sources', async (req, res) => {
const { schema, tname, label, created_by } = req.body;
if (!schema || !tname) {
return res.status(400).json({ error: 'schema and tname are required' });
}
if (!/^\w+$/.test(schema) || !/^\w+$/.test(tname)) {
return res.status(400).json({ error: 'Invalid schema or table name' });
}
const client = await pool.connect();
try {
await client.query('BEGIN');
const src = await client.query(
`INSERT INTO pf.source (schema, tname, label, created_by)
VALUES ($1, $2, $3, $4)
RETURNING *`,
[schema, tname, label || null, created_by || null]
);
const source = src.rows[0];
// seed col_meta from information_schema
await client.query(`
INSERT INTO pf.col_meta (source_id, cname, role, opos)
SELECT $1, column_name, 'ignore', ordinal_position
FROM information_schema.columns
WHERE table_schema = $2 AND table_name = $3
ORDER BY ordinal_position
ON CONFLICT (source_id, cname) DO NOTHING
`, [source.id, schema, tname]);
await client.query('COMMIT');
res.status(201).json(source);
} catch (err) {
await client.query('ROLLBACK');
console.error(err);
if (err.code === '23505') {
return res.status(409).json({ error: 'Source already registered' });
}
res.status(500).json({ error: err.message });
} finally {
client.release();
}
});
// get col_meta for a source
router.get('/sources/:id/cols', async (req, res) => {
try {
const result = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[req.params.id]
);
res.json(result.rows);
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
// save col_meta — accepts full array, upserts each row
router.put('/sources/:id/cols', async (req, res) => {
const sourceId = parseInt(req.params.id);
const cols = req.body;
if (!Array.isArray(cols)) {
return res.status(400).json({ error: 'body must be an array' });
}
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const col of cols) {
await client.query(`
INSERT INTO pf.col_meta (source_id, cname, label, role, is_key, opos)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (source_id, cname) DO UPDATE SET
label = EXCLUDED.label,
role = EXCLUDED.role,
is_key = EXCLUDED.is_key,
opos = EXCLUDED.opos
`, [
sourceId,
col.cname,
col.label || null,
col.role || 'ignore',
col.is_key || false,
col.opos || null
]);
}
await client.query('COMMIT');
const result = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[sourceId]
);
res.json(result.rows);
} catch (err) {
await client.query('ROLLBACK');
console.error(err);
res.status(500).json({ error: err.message });
} finally {
client.release();
}
});
// generate SQL for all operations from current col_meta and store in pf.sql
router.post('/sources/:id/generate-sql', async (req, res) => {
const sourceId = parseInt(req.params.id);
try {
const srcResult = await pool.query(
`SELECT * FROM pf.source WHERE id = $1`, [sourceId]
);
if (srcResult.rows.length === 0) {
return res.status(404).json({ error: 'Source not found' });
}
const colResult = await pool.query(
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
[sourceId]
);
// validate required roles
const colMeta = colResult.rows;
const roles = new Set(colMeta.map(c => c.role));
const missing = ['value', 'units', 'date'].filter(r => !roles.has(r));
if (missing.length > 0) {
return res.status(400).json({
error: `col_meta is missing required roles: ${missing.join(', ')}`
});
}
if (!colMeta.some(c => c.role === 'dimension')) {
return res.status(400).json({ error: 'col_meta has no dimension columns' });
}
const sqls = generateSQL(srcResult.rows[0], colMeta);
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const [operation, sql] of Object.entries(sqls)) {
await client.query(`
INSERT INTO pf.sql (source_id, operation, sql, generated_at)
VALUES ($1, $2, $3, now())
ON CONFLICT (source_id, operation) DO UPDATE SET
sql = EXCLUDED.sql,
generated_at = EXCLUDED.generated_at
`, [sourceId, operation, sql]);
}
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
res.json({ message: 'SQL generated', operations: Object.keys(sqls) });
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
// view generated SQL for a source (for inspection / debug)
router.get('/sources/:id/sql', async (req, res) => {
try {
const result = await pool.query(
`SELECT operation, sql, generated_at
FROM pf.sql WHERE source_id = $1 ORDER BY operation`,
[req.params.id]
);
res.json(result.rows);
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
// get distinct values for a key column (used to populate operation panel dropdowns)
router.get('/sources/:id/values/:col', async (req, res) => {
const col = req.params.col;
try {
const srcResult = await pool.query(
`SELECT schema, tname FROM pf.source WHERE id = $1`, [req.params.id]
);
if (srcResult.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
// validate col is a key dimension on this source
const metaResult = await pool.query(
`SELECT 1 FROM pf.col_meta WHERE source_id = $1 AND cname = $2 AND is_key = true`,
[req.params.id, col]
);
if (metaResult.rows.length === 0) {
return res.status(400).json({ error: `"${col}" is not a key column` });
}
const { schema, tname } = srcResult.rows[0];
const result = await pool.query(
`SELECT DISTINCT "${col}" AS val FROM ${schema}.${tname}
WHERE "${col}" IS NOT NULL ORDER BY "${col}"`
);
res.json(result.rows.map(r => r.val));
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
// deregister a source — does not drop existing forecast tables
router.delete('/sources/:id', async (req, res) => {
try {
const result = await pool.query(
`DELETE FROM pf.source WHERE id = $1 RETURNING *`,
[req.params.id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Source not found' });
}
res.json({ message: 'Source deregistered', source: result.rows[0] });
} catch (err) {
console.error(err);
res.status(500).json({ error: err.message });
}
});
return router;
};