diff --git a/api/routes/stacks.js b/api/routes/stacks.js
index e4b56e3..75f1b66 100644
--- a/api/routes/stacks.js
+++ b/api/routes/stacks.js
@@ -67,9 +67,10 @@ module.exports = (pool) => {
// Add or update a source in a stack
router.put('/:name/sources/:source', async (req, res, next) => {
try {
- const { field_map, amount_sign, balance_offset } = req.body;
+ const { field_map, amount_sign, balance_offset, amount_field, date_field } = req.body;
+ const n = v => v != null ? lit(v) : 'NULL';
const result = await pool.query(
- `SELECT * FROM upsert_stack_source(${lit(req.params.name)}, ${lit(req.params.source)}, ${lit(JSON.stringify(field_map || {}))}, ${lit(amount_sign ?? 1)}, ${lit(balance_offset ?? 0)})`
+ `SELECT * FROM upsert_stack_source(${lit(req.params.name)}, ${lit(req.params.source)}, ${lit(JSON.stringify(field_map || {}))}, ${lit(amount_sign ?? 1)}, ${lit(balance_offset ?? 0)}, ${n(amount_field)}, ${n(date_field)})`
);
res.json(result.rows[0]);
} catch (err) {
diff --git a/database/queries/stacks.sql b/database/queries/stacks.sql
index d805060..8185319 100644
--- a/database/queries/stacks.sql
+++ b/database/queries/stacks.sql
@@ -26,20 +26,26 @@ CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
id SERIAL PRIMARY KEY,
stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
- -- Maps canonical field name → field name in records.transformed
+ -- Maps other canonical field names → source view column names (not amount/date — those are explicit)
field_map JSONB NOT NULL DEFAULT '{}',
- -- Multiply amount by this before summing (1 = as-is, -1 = flip sign)
+ -- Which column in dfv.{source} is the amount, and its sign (+1/-1)
+ amount_field TEXT,
amount_sign INTEGER NOT NULL DEFAULT 1,
- -- Seed balance for this source — added as a constant to the combined running total
+ -- Which column in dfv.{source} is the date
+ date_field TEXT,
+ -- Calibration offset added to this source's running balance
balance_offset NUMERIC NOT NULL DEFAULT 0,
UNIQUE (stack_name, source_name)
);
-- Migrations: add columns that may be missing from earlier deploys
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0;
+ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS amount_field TEXT;
+ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS date_field TEXT;
--- Drop old 3-arg calibrate_balance signature if it exists before recreating with 4 args
+-- Drop old signatures before recreating
DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC);
+DROP FUNCTION IF EXISTS upsert_stack_source(TEXT, TEXT, JSONB, INTEGER, NUMERIC);
------------------------------------------------------
-- Function: list_stacks
@@ -89,7 +95,9 @@ RETURNS TABLE (
'id', ss.id,
'source_name', ss.source_name,
'field_map', ss.field_map,
+ 'amount_field', ss.amount_field,
'amount_sign', ss.amount_sign,
+ 'date_field', ss.date_field,
'balance_offset', ss.balance_offset
) ORDER BY ss.source_name
) FILTER (WHERE ss.id IS NOT NULL), '[]')
@@ -152,14 +160,18 @@ CREATE OR REPLACE FUNCTION upsert_stack_source(
p_source_name TEXT,
p_field_map JSONB DEFAULT '{}',
p_amount_sign INTEGER DEFAULT 1,
- p_balance_offset NUMERIC DEFAULT 0
+ p_balance_offset NUMERIC DEFAULT 0,
+ p_amount_field TEXT DEFAULT NULL,
+ p_date_field TEXT DEFAULT NULL
) RETURNS dataflow.stack_sources AS $$
- INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset)
- VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset)
+ INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field)
+ VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field)
ON CONFLICT (stack_name, source_name) DO UPDATE SET
field_map = EXCLUDED.field_map,
amount_sign = EXCLUDED.amount_sign,
- balance_offset = EXCLUDED.balance_offset
+ balance_offset = EXCLUDED.balance_offset,
+ amount_field = EXCLUDED.amount_field,
+ date_field = EXCLUDED.date_field
RETURNING *;
$$ LANGUAGE sql;
@@ -175,47 +187,40 @@ $$ LANGUAGE sql;
------------------------------------------------------
-- Function: calibrate_balance
--- Given a known good balance at a specific date, compute the offset needed.
--- Returns: {computed_at_date, known_balance, suggested_offset}
+-- Queries dfv.{source} directly using per-source amount/date fields.
+-- No stack view required.
------------------------------------------------------
CREATE OR REPLACE FUNCTION calibrate_balance(
p_stack_name TEXT,
- p_source_name TEXT, -- specific source to calibrate, or NULL for combined
+ p_source_name TEXT,
p_as_of_date DATE,
p_known_balance NUMERIC
) RETURNS JSON AS $$
DECLARE
- v_stack dataflow.stacks%ROWTYPE;
- v_running NUMERIC := 0;
- v_other_offsets NUMERIC := 0;
+ v_src dataflow.stack_sources%ROWTYPE;
+ v_running NUMERIC;
+ v_sql TEXT;
BEGIN
- SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
+ SELECT * INTO v_src
+ FROM dataflow.stack_sources
+ WHERE stack_name = p_stack_name AND source_name = p_source_name;
+
IF NOT FOUND THEN
- RETURN json_build_object('success', false, 'error', 'Stack not found');
+ RETURN json_build_object('success', false, 'error', 'Source not in stack');
END IF;
- IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
- RETURN json_build_object('success', false, 'error', 'Stack must have amount_field and date_field set');
+ IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
+ RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
END IF;
- -- Sum amount * sign for the target source(s) up to as_of_date
- SELECT COALESCE(SUM(
- (rec.transformed ->> (ss.field_map ->> v_stack.amount_field))::numeric
- * ss.amount_sign
- ), 0)
- INTO v_running
- FROM dataflow.stack_sources ss
- JOIN dataflow.records rec ON rec.source_name = ss.source_name
- WHERE ss.stack_name = p_stack_name
- AND (p_source_name IS NULL OR ss.source_name = p_source_name)
- AND rec.transformed IS NOT NULL
- AND (rec.transformed ->> (ss.field_map ->> v_stack.date_field))::date <= p_as_of_date;
-
- -- For combined calibration, include existing offsets from other sources
- IF p_source_name IS NOT NULL THEN
- SELECT COALESCE(SUM(balance_offset), 0) INTO v_other_offsets
- FROM dataflow.stack_sources
- WHERE stack_name = p_stack_name AND source_name != p_source_name;
- END IF;
+ BEGIN
+ v_sql := format(
+ 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
+ v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
+ );
+ EXECUTE v_sql INTO v_running;
+ EXCEPTION WHEN undefined_table THEN
+ RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
+ END;
RETURN json_build_object(
'success', true,
@@ -223,7 +228,6 @@ BEGIN
'as_of_date', p_as_of_date,
'known_balance', p_known_balance,
'computed_sum', v_running,
- 'other_offsets', v_other_offsets,
'suggested_offset', p_known_balance - v_running
);
END;
@@ -231,22 +235,27 @@ $$ LANGUAGE plpgsql STABLE;
------------------------------------------------------
-- Function: generate_stack_view
--- Builds a UNION ALL view in dfv schema from all member sources.
--- Includes running_balance if amount_field and date_field are set.
+-- Builds a WITH ... UNION ALL view in dfv schema from existing dfv source views.
+-- Each source CTE applies amount_sign and computes a per-source running balance.
+-- Outer SELECT adds net_balance across all sources.
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
- v_stack dataflow.stacks%ROWTYPE;
- v_src dataflow.stack_sources%ROWTYPE;
- v_field JSONB;
- v_parts TEXT[] := '{}';
- v_select TEXT;
- v_col TEXT;
- v_src_field TEXT;
- v_view TEXT;
- v_sql TEXT;
- v_has_bal BOOLEAN;
+ v_stack dataflow.stacks%ROWTYPE;
+ v_src dataflow.stack_sources%ROWTYPE;
+ v_field JSONB;
+ v_ctes TEXT[] := '{}';
+ v_cte_names TEXT[] := '{}';
+ v_select TEXT;
+ v_col TEXT;
+ v_src_field TEXT;
+ v_amt_src TEXT;
+ v_date_src TEXT;
+ v_view TEXT;
+ v_sql TEXT;
+ v_has_bal BOOLEAN;
+ v_canon_cols TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
@@ -255,40 +264,63 @@ BEGIN
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
- -- Build one SELECT per source
+ -- Build one CTE per source querying dfv.{source} directly
FOR v_src IN
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name
LOOP
- v_select := format('SELECT %L AS _source, rec.id AS _id', v_src.source_name);
+ v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
- -- Canonical fields, mapped to source field names
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
LOOP
- v_col := v_field->>'name';
- v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
+ v_col := v_field->>'name';
- v_select := v_select || ', ' || CASE v_field->>'type'
- WHEN 'numeric' THEN
- format('(rec.transformed->>%L)::numeric AS %I', v_src_field, v_col)
- WHEN 'date' THEN
- format('(rec.transformed->>%L)::date AS %I', v_src_field, v_col)
+ IF v_has_bal AND v_col = v_stack.amount_field THEN
+ -- Use per-source amount_field with sign applied
+ IF v_src.amount_field IS NULL THEN
+ v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
ELSE
- format('rec.transformed->>%L AS %I', v_src_field, v_col)
- END;
+ v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
+ END IF;
+ ELSIF v_has_bal AND v_col = v_stack.date_field THEN
+ -- Use per-source date_field
+ IF v_src.date_field IS NULL THEN
+ v_select := v_select || format(', NULL::date AS %I', v_col);
+ ELSE
+ v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
+ END IF;
+ ELSE
+ -- Other canonical fields: use field_map or same name, NULL if column doesn't exist
+ v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
+ IF EXISTS (
+ SELECT 1 FROM information_schema.columns
+ WHERE table_schema = 'dfv'
+ AND table_name = v_src.source_name
+ AND column_name = v_src_field
+ ) THEN
+ v_select := v_select || format(', %I AS %I', v_src_field, v_col);
+ ELSE
+ v_select := v_select || format(', NULL::text AS %I', v_col);
+ END IF;
+ END IF;
END LOOP;
- -- amount_sign column for running balance
- v_select := v_select || format(', %s::integer AS _sign, %s::numeric AS _src_offset', v_src.amount_sign, v_src.balance_offset);
+ -- Per-source running balance with calibration offset baked in
+ IF v_has_bal AND v_src.amount_field IS NOT NULL AND v_src.date_field IS NOT NULL THEN
+ v_select := v_select || format(
+ ', SUM(%I * %s) OVER (ORDER BY %I ASC, id ASC) + %s AS source_balance',
+ v_src.amount_field, v_src.amount_sign, v_src.date_field, v_src.balance_offset
+ );
+ ELSE
+ v_select := v_select || ', NULL::numeric AS source_balance';
+ END IF;
- v_select := v_select || format(
- ' FROM dataflow.records rec WHERE rec.source_name = %L AND rec.transformed IS NOT NULL',
- v_src.source_name
- );
+ v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
- v_parts := v_parts || v_select;
+ v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
+ v_cte_names := v_cte_names || quote_ident(v_src.source_name);
END LOOP;
- IF array_length(v_parts, 1) IS NULL THEN
+ IF array_length(v_ctes, 1) IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
END IF;
@@ -296,38 +328,36 @@ BEGIN
v_view := 'dfv.' || quote_ident(p_stack_name);
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
- -- Wrap in outer SELECT that adds running_balance if configured
+ v_canon_cols := (
+ SELECT string_agg(quote_ident(f->>'name'), ', ')
+ FROM jsonb_array_elements(v_stack.fields) f
+ );
+
IF v_has_bal THEN
- -- running_balance = cumulative sum of (amount * sign) + per-source seed offsets + stack-level offset
- -- Each row's _src_offset is that source's balance_offset, summed cumulatively so it's added once
- -- per source in the window rather than per row. We use a trick: sum(_src_offset / count_of_source_rows)
- -- is complex, so instead we add SUM(DISTINCT _src_offset per _source) as a constant via subquery.
- -- Simpler: treat each source offset as a lump added to its first row only via ROW_NUMBER trick.
- -- Cleanest: add total of all source offsets as a constant (valid when each source is calibrated
- -- relative to its own transaction history, not to each other).
+ -- net_balance: cumulative signed amount across all sources + stack-level offset
v_sql := format(
'CREATE VIEW %s AS '
- 'SELECT _source, _id, %s, '
- 'SUM((%I)::numeric * _sign) OVER (ORDER BY %I ASC, _id ASC) '
- '+ (SELECT COALESCE(SUM(balance_offset),0) FROM dataflow.stack_sources WHERE stack_name = %L) '
- '+ %s AS running_balance '
- 'FROM (%s) _stacked',
+ 'WITH %s, _stacked AS (SELECT * FROM %s) '
+ 'SELECT _source, _id, %s, source_balance, '
+ 'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
+ 'FROM _stacked',
v_view,
- (SELECT string_agg(quote_ident(f->>'name'), ', ')
- FROM jsonb_array_elements(v_stack.fields) f),
+ array_to_string(v_ctes, ', '),
+ array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
+ v_canon_cols,
v_stack.amount_field,
v_stack.date_field,
- p_stack_name,
- v_stack.balance_offset,
- array_to_string(v_parts, ' UNION ALL ')
+ v_stack.balance_offset
);
ELSE
v_sql := format(
- 'CREATE VIEW %s AS SELECT _source, _id, %s FROM (%s) _stacked',
+ 'CREATE VIEW %s AS '
+ 'WITH %s, _stacked AS (SELECT * FROM %s) '
+ 'SELECT _source, _id, %s FROM _stacked',
v_view,
- (SELECT string_agg(quote_ident(f->>'name'), ', ')
- FROM jsonb_array_elements(v_stack.fields) f),
- array_to_string(v_parts, ' UNION ALL ')
+ array_to_string(v_ctes, ', '),
+ array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
+ v_canon_cols
);
END IF;
@@ -361,7 +391,7 @@ BEGIN
BEGIN
v_sql := format(
- 'SELECT running_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
+ 'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
v_view, v_stack.date_field
);
EXECUTE v_sql INTO v_balance;
diff --git a/ui/src/pages/Stacks.jsx b/ui/src/pages/Stacks.jsx
index 990059a..fc6e4ad 100644
--- a/ui/src/pages/Stacks.jsx
+++ b/ui/src/pages/Stacks.jsx
@@ -24,14 +24,14 @@ function CalibrateModal({ stack, sourceName, onClose, onApply }) {
}
return (
-
+
{ if (e.target === e.currentTarget) onClose() }}>
e.stopPropagation()}>
- Calibrate balance
+ Calibrate — {sourceName}
- Enter a known good balance at a specific date. The system will compute the offset needed.
+ Enter a known good balance at a specific date. The system will compute the offset needed to make the running balance match.