/** * Stacks Routes * Named unions of multiple sources with field mappings and running balance */ const express = require('express'); const { lit } = require('../lib/sql'); module.exports = (pool) => { const router = express.Router(); // List all stacks router.get('/', async (req, res, next) => { try { const result = await pool.query('SELECT * FROM list_stacks()'); res.json(result.rows); } catch (err) { next(err); } }); // Get single stack with sources router.get('/:name', async (req, res, next) => { try { const result = await pool.query(`SELECT * FROM get_stack(${lit(req.params.name)})`); if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // Create stack router.post('/', async (req, res, next) => { try { const { name, label, fields, amount_field, date_field, balance_offset } = req.body; if (!name) return res.status(400).json({ error: 'name is required' }); const result = await pool.query( `SELECT * FROM create_stack(${lit(name)}, ${lit(label || null)}, ${lit(JSON.stringify(fields || []))}, ${lit(amount_field || null)}, ${lit(date_field || null)}, ${lit(balance_offset ?? 0)})` ); res.status(201).json(result.rows[0]); } catch (err) { if (err.code === '23505') return res.status(409).json({ error: 'Stack already exists' }); next(err); } }); // Update stack router.put('/:name', async (req, res, next) => { try { const { label, fields, amount_field, date_field, balance_offset } = req.body; const n = v => v !== undefined ? lit(v) : 'NULL'; const f = v => v !== undefined ? lit(JSON.stringify(v)) : 'NULL'; const result = await pool.query( `SELECT * FROM update_stack(${lit(req.params.name)}, ${n(label)}, ${f(fields)}, ${n(amount_field)}, ${n(date_field)}, ${n(balance_offset)})` ); if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' }); res.json(result.rows[0]); } catch (err) { next(err); } }); // Delete stack router.delete('/:name', async (req, res, next) => { try { const result = await pool.query(`SELECT * FROM delete_stack(${lit(req.params.name)})`); if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' }); res.json({ success: true, deleted: req.params.name }); } catch (err) { next(err); } }); // Add or update a source in a stack router.put('/:name/sources/:source', async (req, res, next) => { try { const { field_map, amount_sign, balance_offset, amount_field, date_field } = req.body; const n = v => v != null ? lit(v) : 'NULL'; const result = await pool.query( `SELECT * FROM upsert_stack_source(${lit(req.params.name)}, ${lit(req.params.source)}, ${lit(JSON.stringify(field_map || {}))}, ${lit(amount_sign ?? 1)}, ${lit(balance_offset ?? 0)}, ${n(amount_field)}, ${n(date_field)})` ); res.json(result.rows[0]); } catch (err) { if (err.code === '23503') return res.status(404).json({ error: 'Stack or source not found' }); next(err); } }); // Remove a source from a stack router.delete('/:name/sources/:source', async (req, res, next) => { try { const result = await pool.query( `SELECT * FROM remove_stack_source(${lit(req.params.name)}, ${lit(req.params.source)})` ); if (!result.rows.length) return res.status(404).json({ error: 'Source not in stack' }); res.json({ success: true, removed: req.params.source }); } catch (err) { next(err); } }); // Get current running balance from the generated view router.get('/:name/balance', async (req, res, next) => { try { const result = await pool.query(`SELECT get_stack_balance(${lit(req.params.name)}) AS result`); res.json(result.rows[0].result); } catch (err) { next(err); } }); // Preview the SQL that would be generated (dry run — does not create the view) router.get('/:name/view-sql', async (req, res, next) => { try { const result = await pool.query(`SELECT generate_stack_view(${lit(req.params.name)}, true) AS result`); res.json(result.rows[0].result); } catch (err) { next(err); } }); // Generate / refresh the dfv view router.post('/:name/view', async (req, res, next) => { try { const result = await pool.query(`SELECT generate_stack_view(${lit(req.params.name)}) AS result`); const data = result.rows[0].result; if (data && data.success) { await pool.query(`UPDATE dataflow.stacks SET view_generated_at = NOW() WHERE name = ${lit(req.params.name)}`); } res.json(data); } catch (err) { next(err); } }); // Execute custom SQL for the view (user-edited SQL) router.post('/:name/exec-sql', async (req, res, next) => { try { const { sql } = req.body; if (!sql) return res.status(400).json({ success: false, error: 'sql is required' }); await pool.query(`DROP VIEW IF EXISTS dfv.${req.params.name} CASCADE`); await pool.query(sql); await pool.query(`UPDATE dataflow.stacks SET view_generated_at = NOW() WHERE name = ${lit(req.params.name)}`); // Detect stacks whose views were dropped by CASCADE const staleResult = await pool.query(` SELECT array_agg(name) AS names FROM dataflow.stacks WHERE name != ${lit(req.params.name)} AND view_generated_at IS NOT NULL AND NOT EXISTS ( SELECT 1 FROM pg_views WHERE schemaname = 'dfv' AND viewname = name ) `); const cascadeStale = staleResult.rows[0].names || []; if (cascadeStale.length) { await pool.query(`UPDATE dataflow.stacks SET view_generated_at = NULL WHERE name = ANY($1)`, [cascadeStale]); } res.json({ success: true, cascade_stale: cascadeStale }); } catch (err) { res.json({ success: false, error: err.message }); } }); // Calibrate balance offset given a known good balance at a specific date router.post('/:name/calibrate', async (req, res, next) => { try { const { as_of_date, known_balance, source_name } = req.body; if (known_balance === undefined) { return res.status(400).json({ error: 'known_balance is required' }); } const dateExpr = as_of_date ? `${lit(as_of_date)}::date` : 'NULL'; const result = await pool.query( `SELECT calibrate_balance(${lit(req.params.name)}, ${source_name ? lit(source_name) : 'NULL'}, ${dateExpr}, ${lit(known_balance)}::numeric) AS result` ); res.json(result.rows[0].result); } catch (err) { next(err); } }); return router; };