diff --git a/api/routes/stacks.js b/api/routes/stacks.js
new file mode 100644
index 0000000..58ee927
--- /dev/null
+++ b/api/routes/stacks.js
@@ -0,0 +1,115 @@
+/**
+ * Stacks Routes
+ * Named unions of multiple sources with field mappings and running balance
+ */
+
+const express = require('express');
+const { lit } = require('../lib/sql');
+
+module.exports = (pool) => {
+ const router = express.Router();
+
+ // List all stacks
+ router.get('/', async (req, res, next) => {
+ try {
+ const result = await pool.query('SELECT * FROM list_stacks()');
+ res.json(result.rows);
+ } catch (err) { next(err); }
+ });
+
+ // Get single stack with sources
+ router.get('/:name', async (req, res, next) => {
+ try {
+ const result = await pool.query(`SELECT * FROM get_stack(${lit(req.params.name)})`);
+ if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' });
+ res.json(result.rows[0]);
+ } catch (err) { next(err); }
+ });
+
+ // Create stack
+ router.post('/', async (req, res, next) => {
+ try {
+ const { name, label, fields, amount_field, date_field, balance_offset } = req.body;
+ if (!name) return res.status(400).json({ error: 'name is required' });
+ const result = await pool.query(
+ `SELECT * FROM create_stack(${lit(name)}, ${lit(label || null)}, ${lit(JSON.stringify(fields || []))}, ${lit(amount_field || null)}, ${lit(date_field || null)}, ${lit(balance_offset ?? 0)})`
+ );
+ res.status(201).json(result.rows[0]);
+ } catch (err) {
+ if (err.code === '23505') return res.status(409).json({ error: 'Stack already exists' });
+ next(err);
+ }
+ });
+
+ // Update stack
+ router.put('/:name', async (req, res, next) => {
+ try {
+ const { label, fields, amount_field, date_field, balance_offset } = req.body;
+ const n = v => v !== undefined ? lit(v) : 'NULL';
+ const f = v => v !== undefined ? lit(JSON.stringify(v)) : 'NULL';
+ const result = await pool.query(
+ `SELECT * FROM update_stack(${lit(req.params.name)}, ${n(label)}, ${f(fields)}, ${n(amount_field)}, ${n(date_field)}, ${n(balance_offset)})`
+ );
+ if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' });
+ res.json(result.rows[0]);
+ } catch (err) { next(err); }
+ });
+
+ // Delete stack
+ router.delete('/:name', async (req, res, next) => {
+ try {
+ const result = await pool.query(`SELECT * FROM delete_stack(${lit(req.params.name)})`);
+ if (!result.rows.length) return res.status(404).json({ error: 'Stack not found' });
+ res.json({ success: true, deleted: req.params.name });
+ } catch (err) { next(err); }
+ });
+
+ // Add or update a source in a stack
+ router.put('/:name/sources/:source', async (req, res, next) => {
+ try {
+ const { field_map, amount_sign, balance_offset } = req.body;
+ const result = await pool.query(
+ `SELECT * FROM upsert_stack_source(${lit(req.params.name)}, ${lit(req.params.source)}, ${lit(JSON.stringify(field_map || {}))}, ${lit(amount_sign ?? 1)}, ${lit(balance_offset ?? 0)})`
+ );
+ res.json(result.rows[0]);
+ } catch (err) {
+ if (err.code === '23503') return res.status(404).json({ error: 'Stack or source not found' });
+ next(err);
+ }
+ });
+
+ // Remove a source from a stack
+ router.delete('/:name/sources/:source', async (req, res, next) => {
+ try {
+ const result = await pool.query(
+ `SELECT * FROM remove_stack_source(${lit(req.params.name)}, ${lit(req.params.source)})`
+ );
+ if (!result.rows.length) return res.status(404).json({ error: 'Source not in stack' });
+ res.json({ success: true, removed: req.params.source });
+ } catch (err) { next(err); }
+ });
+
+ // Generate / refresh the dfv view
+ router.post('/:name/view', async (req, res, next) => {
+ try {
+ const result = await pool.query(`SELECT generate_stack_view(${lit(req.params.name)}) AS result`);
+ res.json(result.rows[0].result);
+ } catch (err) { next(err); }
+ });
+
+ // Calibrate balance offset given a known good balance at a specific date
+ router.post('/:name/calibrate', async (req, res, next) => {
+ try {
+ const { as_of_date, known_balance, source_name } = req.body;
+ if (!as_of_date || known_balance === undefined) {
+ return res.status(400).json({ error: 'as_of_date and known_balance are required' });
+ }
+ const result = await pool.query(
+ `SELECT calibrate_balance(${lit(req.params.name)}, ${source_name ? lit(source_name) : 'NULL'}, ${lit(as_of_date)}::date, ${lit(known_balance)}::numeric) AS result`
+ );
+ res.json(result.rows[0].result);
+ } catch (err) { next(err); }
+ });
+
+ return router;
+};
diff --git a/api/server.js b/api/server.js
index 5f42c0a..60a7982 100644
--- a/api/server.js
+++ b/api/server.js
@@ -54,12 +54,14 @@ const sourcesRoutes = require('./routes/sources');
const rulesRoutes = require('./routes/rules');
const mappingsRoutes = require('./routes/mappings');
const recordsRoutes = require('./routes/records');
+const stacksRoutes = require('./routes/stacks');
// Mount routes
app.use('/api/sources', sourcesRoutes(pool));
app.use('/api/rules', rulesRoutes(pool));
app.use('/api/mappings', mappingsRoutes(pool));
app.use('/api/records', recordsRoutes(pool));
+app.use('/api/stacks', stacksRoutes(pool));
// Health check
app.get('/health', (req, res) => {
diff --git a/database/queries/stacks.sql b/database/queries/stacks.sql
new file mode 100644
index 0000000..4002b50
--- /dev/null
+++ b/database/queries/stacks.sql
@@ -0,0 +1,335 @@
+--
+-- Stacks queries
+-- All SQL for api/routes/stacks.js
+--
+
+SET search_path TO dataflow, public;
+
+------------------------------------------------------
+-- Tables
+------------------------------------------------------
+
+CREATE TABLE IF NOT EXISTS dataflow.stacks (
+ name TEXT PRIMARY KEY,
+ label TEXT,
+ -- Ordered canonical field definitions: [{name, label, type}]
+ -- type: 'text' | 'numeric' | 'date'
+ fields JSONB NOT NULL DEFAULT '[]',
+ -- Running balance config
+ amount_field TEXT, -- canonical field to sum for running balance
+ date_field TEXT, -- canonical field to order by
+ balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration)
+ created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
+);
+
+CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
+ id SERIAL PRIMARY KEY,
+ stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
+ source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
+ -- Maps canonical field name → field name in records.transformed
+ field_map JSONB NOT NULL DEFAULT '{}',
+ -- Multiply amount by this before summing (1 = as-is, -1 = flip sign)
+ amount_sign INTEGER NOT NULL DEFAULT 1,
+ -- Seed balance for this source — added as a constant to the combined running total
+ balance_offset NUMERIC NOT NULL DEFAULT 0,
+ UNIQUE (stack_name, source_name)
+);
+
+------------------------------------------------------
+-- Function: list_stacks
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION list_stacks()
+RETURNS TABLE (
+ name TEXT,
+ label TEXT,
+ fields JSONB,
+ amount_field TEXT,
+ date_field TEXT,
+ balance_offset NUMERIC,
+ source_count BIGINT,
+ created_at TIMESTAMPTZ
+) AS $$
+ SELECT
+ s.name, s.label, s.fields,
+ s.amount_field, s.date_field, s.balance_offset,
+ count(ss.id) AS source_count,
+ s.created_at
+ FROM dataflow.stacks s
+ LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
+ GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
+ ORDER BY s.name;
+$$ LANGUAGE sql STABLE;
+
+------------------------------------------------------
+-- Function: get_stack
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION get_stack(p_name TEXT)
+RETURNS TABLE (
+ name TEXT,
+ label TEXT,
+ fields JSONB,
+ amount_field TEXT,
+ date_field TEXT,
+ balance_offset NUMERIC,
+ created_at TIMESTAMPTZ,
+ sources JSONB
+) AS $$
+ SELECT
+ s.name, s.label, s.fields,
+ s.amount_field, s.date_field, s.balance_offset,
+ s.created_at,
+ COALESCE(jsonb_agg(
+ jsonb_build_object(
+ 'id', ss.id,
+ 'source_name', ss.source_name,
+ 'field_map', ss.field_map,
+ 'amount_sign', ss.amount_sign,
+ 'balance_offset', ss.balance_offset
+ ) ORDER BY ss.source_name
+ ) FILTER (WHERE ss.id IS NOT NULL), '[]')
+ FROM dataflow.stacks s
+ LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
+ WHERE s.name = p_name
+ GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
+$$ LANGUAGE sql STABLE;
+
+------------------------------------------------------
+-- Function: create_stack
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION create_stack(
+ p_name TEXT,
+ p_label TEXT DEFAULT NULL,
+ p_fields JSONB DEFAULT '[]',
+ p_amount_field TEXT DEFAULT NULL,
+ p_date_field TEXT DEFAULT NULL,
+ p_balance_offset NUMERIC DEFAULT 0
+) RETURNS dataflow.stacks AS $$
+ INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
+ VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
+ RETURNING *;
+$$ LANGUAGE sql;
+
+------------------------------------------------------
+-- Function: update_stack
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION update_stack(
+ p_name TEXT,
+ p_label TEXT DEFAULT NULL,
+ p_fields JSONB DEFAULT NULL,
+ p_amount_field TEXT DEFAULT NULL,
+ p_date_field TEXT DEFAULT NULL,
+ p_balance_offset NUMERIC DEFAULT NULL
+) RETURNS dataflow.stacks AS $$
+ UPDATE dataflow.stacks SET
+ label = COALESCE(p_label, label),
+ fields = COALESCE(p_fields, fields),
+ amount_field = COALESCE(p_amount_field, amount_field),
+ date_field = COALESCE(p_date_field, date_field),
+ balance_offset = COALESCE(p_balance_offset, balance_offset)
+ WHERE name = p_name
+ RETURNING *;
+$$ LANGUAGE sql;
+
+------------------------------------------------------
+-- Function: delete_stack
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT)
+RETURNS TABLE (name TEXT) AS $$
+ DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
+$$ LANGUAGE sql;
+
+------------------------------------------------------
+-- Function: upsert_stack_source
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION upsert_stack_source(
+ p_stack_name TEXT,
+ p_source_name TEXT,
+ p_field_map JSONB DEFAULT '{}',
+ p_amount_sign INTEGER DEFAULT 1,
+ p_balance_offset NUMERIC DEFAULT 0
+) RETURNS dataflow.stack_sources AS $$
+ INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset)
+ VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset)
+ ON CONFLICT (stack_name, source_name) DO UPDATE SET
+ field_map = EXCLUDED.field_map,
+ amount_sign = EXCLUDED.amount_sign,
+ balance_offset = EXCLUDED.balance_offset
+ RETURNING *;
+$$ LANGUAGE sql;
+
+------------------------------------------------------
+-- Function: remove_stack_source
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT)
+RETURNS TABLE (source_name TEXT) AS $$
+ DELETE FROM dataflow.stack_sources
+ WHERE stack_name = p_stack_name AND source_name = p_source_name
+ RETURNING source_name;
+$$ LANGUAGE sql;
+
+------------------------------------------------------
+-- Function: calibrate_balance
+-- Given a known good balance at a specific date, compute the offset needed.
+-- Returns: {computed_at_date, known_balance, suggested_offset}
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION calibrate_balance(
+ p_stack_name TEXT,
+ p_source_name TEXT, -- specific source to calibrate, or NULL for combined
+ p_as_of_date DATE,
+ p_known_balance NUMERIC
+) RETURNS JSON AS $$
+DECLARE
+ v_stack dataflow.stacks%ROWTYPE;
+ v_running NUMERIC := 0;
+ v_other_offsets NUMERIC := 0;
+BEGIN
+ SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
+ IF NOT FOUND THEN
+ RETURN json_build_object('success', false, 'error', 'Stack not found');
+ END IF;
+ IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
+ RETURN json_build_object('success', false, 'error', 'Stack must have amount_field and date_field set');
+ END IF;
+
+ -- Sum amount * sign for the target source(s) up to as_of_date
+ SELECT COALESCE(SUM(
+ (rec.transformed ->> (ss.field_map ->> v_stack.amount_field))::numeric
+ * ss.amount_sign
+ ), 0)
+ INTO v_running
+ FROM dataflow.stack_sources ss
+ JOIN dataflow.records rec ON rec.source_name = ss.source_name
+ WHERE ss.stack_name = p_stack_name
+ AND (p_source_name IS NULL OR ss.source_name = p_source_name)
+ AND rec.transformed IS NOT NULL
+ AND (rec.transformed ->> (ss.field_map ->> v_stack.date_field))::date <= p_as_of_date;
+
+ -- For combined calibration, include existing offsets from other sources
+ IF p_source_name IS NOT NULL THEN
+ SELECT COALESCE(SUM(balance_offset), 0) INTO v_other_offsets
+ FROM dataflow.stack_sources
+ WHERE stack_name = p_stack_name AND source_name != p_source_name;
+ END IF;
+
+ RETURN json_build_object(
+ 'success', true,
+ 'source', p_source_name,
+ 'as_of_date', p_as_of_date,
+ 'known_balance', p_known_balance,
+ 'computed_sum', v_running,
+ 'other_offsets', v_other_offsets,
+ 'suggested_offset', p_known_balance - v_running
+ );
+END;
+$$ LANGUAGE plpgsql STABLE;
+
+------------------------------------------------------
+-- Function: generate_stack_view
+-- Builds a UNION ALL view in dfv schema from all member sources.
+-- Includes running_balance if amount_field and date_field are set.
+------------------------------------------------------
+CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT)
+RETURNS JSON AS $$
+DECLARE
+ v_stack dataflow.stacks%ROWTYPE;
+ v_src dataflow.stack_sources%ROWTYPE;
+ v_field JSONB;
+ v_parts TEXT[] := '{}';
+ v_select TEXT;
+ v_col TEXT;
+ v_src_field TEXT;
+ v_view TEXT;
+ v_sql TEXT;
+ v_has_bal BOOLEAN;
+BEGIN
+ SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
+ IF NOT FOUND THEN
+ RETURN json_build_object('success', false, 'error', 'Stack not found');
+ END IF;
+
+ v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
+
+ -- Build one SELECT per source
+ FOR v_src IN
+ SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name
+ LOOP
+ v_select := format('SELECT %L AS _source, rec.id AS _id', v_src.source_name);
+
+ -- Canonical fields, mapped to source field names
+ FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
+ LOOP
+ v_col := v_field->>'name';
+ v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
+
+ v_select := v_select || ', ' || CASE v_field->>'type'
+ WHEN 'numeric' THEN
+ format('(rec.transformed->>%L)::numeric AS %I', v_src_field, v_col)
+ WHEN 'date' THEN
+ format('(rec.transformed->>%L)::date AS %I', v_src_field, v_col)
+ ELSE
+ format('rec.transformed->>%L AS %I', v_src_field, v_col)
+ END;
+ END LOOP;
+
+ -- amount_sign column for running balance
+ v_select := v_select || format(', %s::integer AS _sign, %s::numeric AS _src_offset', v_src.amount_sign, v_src.balance_offset);
+
+ v_select := v_select || format(
+ ' FROM dataflow.records rec WHERE rec.source_name = %L AND rec.transformed IS NOT NULL',
+ v_src.source_name
+ );
+
+ v_parts := v_parts || v_select;
+ END LOOP;
+
+ IF array_length(v_parts, 1) IS NULL THEN
+ RETURN json_build_object('success', false, 'error', 'Stack has no sources');
+ END IF;
+
+ CREATE SCHEMA IF NOT EXISTS dfv;
+ v_view := 'dfv.' || quote_ident(p_stack_name);
+ EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
+
+ -- Wrap in outer SELECT that adds running_balance if configured
+ IF v_has_bal THEN
+ -- running_balance = cumulative sum of (amount * sign) + per-source seed offsets + stack-level offset
+ -- Each row's _src_offset is that source's balance_offset, summed cumulatively so it's added once
+ -- per source in the window rather than per row. We use a trick: sum(_src_offset / count_of_source_rows)
+ -- is complex, so instead we add SUM(DISTINCT _src_offset per _source) as a constant via subquery.
+ -- Simpler: treat each source offset as a lump added to its first row only via ROW_NUMBER trick.
+ -- Cleanest: add total of all source offsets as a constant (valid when each source is calibrated
+ -- relative to its own transaction history, not to each other).
+ v_sql := format(
+ 'CREATE VIEW %s AS '
+ 'SELECT _source, _id, %s, '
+ 'SUM((%I)::numeric * _sign) OVER (ORDER BY %I ASC, _id ASC) '
+ '+ (SELECT COALESCE(SUM(balance_offset),0) FROM dataflow.stack_sources WHERE stack_name = %L) '
+ '+ %s AS running_balance '
+ 'FROM (%s) _stacked',
+ v_view,
+ (SELECT string_agg(quote_ident(f->>'name'), ', ')
+ FROM jsonb_array_elements(v_stack.fields) f),
+ v_stack.amount_field,
+ v_stack.date_field,
+ p_stack_name,
+ v_stack.balance_offset,
+ array_to_string(v_parts, ' UNION ALL ')
+ );
+ ELSE
+ v_sql := format(
+ 'CREATE VIEW %s AS SELECT _source, _id, %s FROM (%s) _stacked',
+ v_view,
+ (SELECT string_agg(quote_ident(f->>'name'), ', ')
+ FROM jsonb_array_elements(v_stack.fields) f),
+ array_to_string(v_parts, ' UNION ALL ')
+ );
+ END IF;
+
+ EXECUTE v_sql;
+
+ RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
+END;
+$$ LANGUAGE plpgsql;
+
+COMMENT ON FUNCTION generate_stack_view IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance';
+COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';
diff --git a/ui/src/App.jsx b/ui/src/App.jsx
index 66e34b7..d3323d0 100644
--- a/ui/src/App.jsx
+++ b/ui/src/App.jsx
@@ -10,6 +10,7 @@ import Records from './pages/Records'
import Log from './pages/Log'
import Pivot from './pages/Pivot'
import Remap from './pages/Remap'
+import Stacks from './pages/Stacks'
const NAV = [
{ to: '/sources', label: 'Sources' },
@@ -19,6 +20,7 @@ const NAV = [
{ to: '/remap', label: 'Remap' },
{ to: '/records', label: 'Records' },
{ to: '/pivot', label: 'Pivot' },
+ { to: '/stacks', label: 'Stacks' },
{ to: '/log', label: 'Log' },
]
@@ -149,6 +151,7 @@ export default function App() {
+ Enter a known good balance at a specific date. The system will compute the offset needed. +
+{error}
} + {result && ( +{error}
} + +View created: {viewResult.view}
+ :{viewResult.error}
+ } + {viewResult.sql && ( +{viewResult.sql}
+ )}
+ {error}
} +No stacks yet.
+ )} +Select a stack or create one.
+ )} +