Node.js/Express + PostgreSQL forecasting app with AG Grid Enterprise pivot UI. Supports baseline, scale, recode, clone operations on configurable source tables. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
244 lines
9.2 KiB
JavaScript
244 lines
9.2 KiB
JavaScript
const express = require('express');
|
|
const { generateSQL } = require('../lib/sql_generator');
|
|
|
|
module.exports = function(pool) {
|
|
const router = express.Router();
|
|
|
|
// list all registered sources
|
|
router.get('/sources', async (req, res) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT * FROM pf.source ORDER BY schema, tname`
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// register a source table
|
|
// auto-populates col_meta from information_schema with role='ignore'
|
|
router.post('/sources', async (req, res) => {
|
|
const { schema, tname, label, created_by } = req.body;
|
|
if (!schema || !tname) {
|
|
return res.status(400).json({ error: 'schema and tname are required' });
|
|
}
|
|
if (!/^\w+$/.test(schema) || !/^\w+$/.test(tname)) {
|
|
return res.status(400).json({ error: 'Invalid schema or table name' });
|
|
}
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
|
|
const src = await client.query(
|
|
`INSERT INTO pf.source (schema, tname, label, created_by)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING *`,
|
|
[schema, tname, label || null, created_by || null]
|
|
);
|
|
const source = src.rows[0];
|
|
|
|
// seed col_meta from information_schema
|
|
await client.query(`
|
|
INSERT INTO pf.col_meta (source_id, cname, role, opos)
|
|
SELECT $1, column_name, 'ignore', ordinal_position
|
|
FROM information_schema.columns
|
|
WHERE table_schema = $2 AND table_name = $3
|
|
ORDER BY ordinal_position
|
|
ON CONFLICT (source_id, cname) DO NOTHING
|
|
`, [source.id, schema, tname]);
|
|
|
|
await client.query('COMMIT');
|
|
res.status(201).json(source);
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
console.error(err);
|
|
if (err.code === '23505') {
|
|
return res.status(409).json({ error: 'Source already registered' });
|
|
}
|
|
res.status(500).json({ error: err.message });
|
|
} finally {
|
|
client.release();
|
|
}
|
|
});
|
|
|
|
// get col_meta for a source
|
|
router.get('/sources/:id/cols', async (req, res) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
|
|
[req.params.id]
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// save col_meta — accepts full array, upserts each row
|
|
router.put('/sources/:id/cols', async (req, res) => {
|
|
const sourceId = parseInt(req.params.id);
|
|
const cols = req.body;
|
|
if (!Array.isArray(cols)) {
|
|
return res.status(400).json({ error: 'body must be an array' });
|
|
}
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
for (const col of cols) {
|
|
await client.query(`
|
|
INSERT INTO pf.col_meta (source_id, cname, label, role, is_key, opos)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
ON CONFLICT (source_id, cname) DO UPDATE SET
|
|
label = EXCLUDED.label,
|
|
role = EXCLUDED.role,
|
|
is_key = EXCLUDED.is_key,
|
|
opos = EXCLUDED.opos
|
|
`, [
|
|
sourceId,
|
|
col.cname,
|
|
col.label || null,
|
|
col.role || 'ignore',
|
|
col.is_key || false,
|
|
col.opos || null
|
|
]);
|
|
}
|
|
await client.query('COMMIT');
|
|
const result = await pool.query(
|
|
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
|
|
[sourceId]
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
} finally {
|
|
client.release();
|
|
}
|
|
});
|
|
|
|
// generate SQL for all operations from current col_meta and store in pf.sql
|
|
router.post('/sources/:id/generate-sql', async (req, res) => {
|
|
const sourceId = parseInt(req.params.id);
|
|
try {
|
|
const srcResult = await pool.query(
|
|
`SELECT * FROM pf.source WHERE id = $1`, [sourceId]
|
|
);
|
|
if (srcResult.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Source not found' });
|
|
}
|
|
|
|
const colResult = await pool.query(
|
|
`SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`,
|
|
[sourceId]
|
|
);
|
|
|
|
// validate required roles
|
|
const colMeta = colResult.rows;
|
|
const roles = new Set(colMeta.map(c => c.role));
|
|
const missing = ['value', 'units', 'date'].filter(r => !roles.has(r));
|
|
if (missing.length > 0) {
|
|
return res.status(400).json({
|
|
error: `col_meta is missing required roles: ${missing.join(', ')}`
|
|
});
|
|
}
|
|
if (!colMeta.some(c => c.role === 'dimension')) {
|
|
return res.status(400).json({ error: 'col_meta has no dimension columns' });
|
|
}
|
|
|
|
const sqls = generateSQL(srcResult.rows[0], colMeta);
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
for (const [operation, sql] of Object.entries(sqls)) {
|
|
await client.query(`
|
|
INSERT INTO pf.sql (source_id, operation, sql, generated_at)
|
|
VALUES ($1, $2, $3, now())
|
|
ON CONFLICT (source_id, operation) DO UPDATE SET
|
|
sql = EXCLUDED.sql,
|
|
generated_at = EXCLUDED.generated_at
|
|
`, [sourceId, operation, sql]);
|
|
}
|
|
await client.query('COMMIT');
|
|
} catch (err) {
|
|
await client.query('ROLLBACK');
|
|
throw err;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
|
|
res.json({ message: 'SQL generated', operations: Object.keys(sqls) });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// view generated SQL for a source (for inspection / debug)
|
|
router.get('/sources/:id/sql', async (req, res) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT operation, sql, generated_at
|
|
FROM pf.sql WHERE source_id = $1 ORDER BY operation`,
|
|
[req.params.id]
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// get distinct values for a key column (used to populate operation panel dropdowns)
|
|
router.get('/sources/:id/values/:col', async (req, res) => {
|
|
const col = req.params.col;
|
|
try {
|
|
const srcResult = await pool.query(
|
|
`SELECT schema, tname FROM pf.source WHERE id = $1`, [req.params.id]
|
|
);
|
|
if (srcResult.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
|
|
|
|
// validate col is a key dimension on this source
|
|
const metaResult = await pool.query(
|
|
`SELECT 1 FROM pf.col_meta WHERE source_id = $1 AND cname = $2 AND is_key = true`,
|
|
[req.params.id, col]
|
|
);
|
|
if (metaResult.rows.length === 0) {
|
|
return res.status(400).json({ error: `"${col}" is not a key column` });
|
|
}
|
|
|
|
const { schema, tname } = srcResult.rows[0];
|
|
const result = await pool.query(
|
|
`SELECT DISTINCT "${col}" AS val FROM ${schema}.${tname}
|
|
WHERE "${col}" IS NOT NULL ORDER BY "${col}"`
|
|
);
|
|
res.json(result.rows.map(r => r.val));
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// deregister a source — does not drop existing forecast tables
|
|
router.delete('/sources/:id', async (req, res) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`DELETE FROM pf.source WHERE id = $1 RETURNING *`,
|
|
[req.params.id]
|
|
);
|
|
if (result.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Source not found' });
|
|
}
|
|
res.json({ message: 'Source deregistered', source: result.rows[0] });
|
|
} catch (err) {
|
|
console.error(err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
return router;
|
|
};
|