const express = require('express'); const { generateSQL } = require('../lib/sql_generator'); module.exports = function(pool) { const router = express.Router(); // list all registered sources router.get('/sources', async (req, res) => { try { const result = await pool.query( `SELECT * FROM pf.source ORDER BY schema, tname` ); res.json(result.rows); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); // register a source table // auto-populates col_meta from information_schema with role='ignore' router.post('/sources', async (req, res) => { const { schema, tname, label, created_by } = req.body; if (!schema || !tname) { return res.status(400).json({ error: 'schema and tname are required' }); } if (!/^\w+$/.test(schema) || !/^\w+$/.test(tname)) { return res.status(400).json({ error: 'Invalid schema or table name' }); } const client = await pool.connect(); try { await client.query('BEGIN'); const src = await client.query( `INSERT INTO pf.source (schema, tname, label, created_by) VALUES ($1, $2, $3, $4) RETURNING *`, [schema, tname, label || null, created_by || null] ); const source = src.rows[0]; // seed col_meta from information_schema await client.query(` INSERT INTO pf.col_meta (source_id, cname, role, opos) SELECT $1, column_name, 'ignore', ordinal_position FROM information_schema.columns WHERE table_schema = $2 AND table_name = $3 ORDER BY ordinal_position ON CONFLICT (source_id, cname) DO NOTHING `, [source.id, schema, tname]); await client.query('COMMIT'); res.status(201).json(source); } catch (err) { await client.query('ROLLBACK'); console.error(err); if (err.code === '23505') { return res.status(409).json({ error: 'Source already registered' }); } res.status(500).json({ error: err.message }); } finally { client.release(); } }); // get col_meta for a source router.get('/sources/:id/cols', async (req, res) => { try { const result = await pool.query( `SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`, [req.params.id] ); res.json(result.rows); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); // save col_meta — accepts full array, upserts each row router.put('/sources/:id/cols', async (req, res) => { const sourceId = parseInt(req.params.id); const cols = req.body; if (!Array.isArray(cols)) { return res.status(400).json({ error: 'body must be an array' }); } const client = await pool.connect(); try { await client.query('BEGIN'); for (const col of cols) { await client.query(` INSERT INTO pf.col_meta (source_id, cname, label, role, is_key, opos) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (source_id, cname) DO UPDATE SET label = EXCLUDED.label, role = EXCLUDED.role, is_key = EXCLUDED.is_key, opos = EXCLUDED.opos `, [ sourceId, col.cname, col.label || null, col.role || 'ignore', col.is_key || false, col.opos || null ]); } await client.query('COMMIT'); const result = await pool.query( `SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`, [sourceId] ); res.json(result.rows); } catch (err) { await client.query('ROLLBACK'); console.error(err); res.status(500).json({ error: err.message }); } finally { client.release(); } }); // generate SQL for all operations from current col_meta and store in pf.sql router.post('/sources/:id/generate-sql', async (req, res) => { const sourceId = parseInt(req.params.id); try { const srcResult = await pool.query( `SELECT * FROM pf.source WHERE id = $1`, [sourceId] ); if (srcResult.rows.length === 0) { return res.status(404).json({ error: 'Source not found' }); } const colResult = await pool.query( `SELECT * FROM pf.col_meta WHERE source_id = $1 ORDER BY opos`, [sourceId] ); // validate required roles const colMeta = colResult.rows; const roles = new Set(colMeta.map(c => c.role)); const missing = ['value', 'units', 'date'].filter(r => !roles.has(r)); if (missing.length > 0) { return res.status(400).json({ error: `col_meta is missing required roles: ${missing.join(', ')}` }); } if (!colMeta.some(c => c.role === 'dimension')) { return res.status(400).json({ error: 'col_meta has no dimension columns' }); } const sqls = generateSQL(srcResult.rows[0], colMeta); const client = await pool.connect(); try { await client.query('BEGIN'); for (const [operation, sql] of Object.entries(sqls)) { await client.query(` INSERT INTO pf.sql (source_id, operation, sql, generated_at) VALUES ($1, $2, $3, now()) ON CONFLICT (source_id, operation) DO UPDATE SET sql = EXCLUDED.sql, generated_at = EXCLUDED.generated_at `, [sourceId, operation, sql]); } await client.query('COMMIT'); } catch (err) { await client.query('ROLLBACK'); throw err; } finally { client.release(); } res.json({ message: 'SQL generated', operations: Object.keys(sqls) }); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); // view generated SQL for a source (for inspection / debug) router.get('/sources/:id/sql', async (req, res) => { try { const result = await pool.query( `SELECT operation, sql, generated_at FROM pf.sql WHERE source_id = $1 ORDER BY operation`, [req.params.id] ); res.json(result.rows); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); // get distinct values for a key column (used to populate operation panel dropdowns) router.get('/sources/:id/values/:col', async (req, res) => { const col = req.params.col; try { const srcResult = await pool.query( `SELECT schema, tname FROM pf.source WHERE id = $1`, [req.params.id] ); if (srcResult.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); // validate col is a key dimension on this source const metaResult = await pool.query( `SELECT 1 FROM pf.col_meta WHERE source_id = $1 AND cname = $2 AND is_key = true`, [req.params.id, col] ); if (metaResult.rows.length === 0) { return res.status(400).json({ error: `"${col}" is not a key column` }); } const { schema, tname } = srcResult.rows[0]; const result = await pool.query( `SELECT DISTINCT "${col}" AS val FROM ${schema}.${tname} WHERE "${col}" IS NOT NULL ORDER BY "${col}"` ); res.json(result.rows.map(r => r.val)); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); // deregister a source — does not drop existing forecast tables router.delete('/sources/:id', async (req, res) => { try { const result = await pool.query( `DELETE FROM pf.source WHERE id = $1 RETURNING *`, [req.params.id] ); if (result.rows.length === 0) { return res.status(404).json({ error: 'Source not found' }); } res.json({ message: 'Source deregistered', source: result.rows[0] }); } catch (err) { console.error(err); res.status(500).json({ error: err.message }); } }); return router; };