dataflow/database/queries/stacks.sql
Paul Trowbridge f941c5ae4a Stacks: per-source amount/date fields, mapping grid UI, dfv view generation with source balance CTEs
- SQL: upsert_stack_source gains amount_field/date_field params; calibrate_balance queries dfv.{source} directly (no stack view needed); generate_stack_view builds per-source CTEs with source_balance, outer net_balance; information_schema check for missing columns
- API: pass amount_field/date_field through upsert route; calibrate accepts source_name
- UI: mapping grid table (rows=fields, cols=sources); per-source amount/date/sign in Sources section; auto-populate output columns on first source config; horizontal stack chips above full-width config panel; calibration auto-saves before opening, editable offset input

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 21:17:12 -04:00

409 lines
16 KiB
PL/PgSQL

--
-- Stacks queries
-- All SQL for api/routes/stacks.js
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Tables
------------------------------------------------------
CREATE TABLE IF NOT EXISTS dataflow.stacks (
name TEXT PRIMARY KEY,
label TEXT,
-- Ordered canonical field definitions: [{name, label, type}]
-- type: 'text' | 'numeric' | 'date'
fields JSONB NOT NULL DEFAULT '[]',
-- Running balance config
amount_field TEXT, -- canonical field to sum for running balance
date_field TEXT, -- canonical field to order by
balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration)
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
id SERIAL PRIMARY KEY,
stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
-- Maps other canonical field names → source view column names (not amount/date — those are explicit)
field_map JSONB NOT NULL DEFAULT '{}',
-- Which column in dfv.{source} is the amount, and its sign (+1/-1)
amount_field TEXT,
amount_sign INTEGER NOT NULL DEFAULT 1,
-- Which column in dfv.{source} is the date
date_field TEXT,
-- Calibration offset added to this source's running balance
balance_offset NUMERIC NOT NULL DEFAULT 0,
UNIQUE (stack_name, source_name)
);
-- Migrations: add columns that may be missing from earlier deploys
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0;
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS amount_field TEXT;
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS date_field TEXT;
-- Drop old signatures before recreating
DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC);
DROP FUNCTION IF EXISTS upsert_stack_source(TEXT, TEXT, JSONB, INTEGER, NUMERIC);
------------------------------------------------------
-- Function: list_stacks
------------------------------------------------------
CREATE OR REPLACE FUNCTION list_stacks()
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
source_count BIGINT,
created_at TIMESTAMPTZ
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
count(ss.id) AS source_count,
s.created_at
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
ORDER BY s.name;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: get_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack(p_name TEXT)
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
created_at TIMESTAMPTZ,
sources JSONB
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
s.created_at,
COALESCE(jsonb_agg(
jsonb_build_object(
'id', ss.id,
'source_name', ss.source_name,
'field_map', ss.field_map,
'amount_field', ss.amount_field,
'amount_sign', ss.amount_sign,
'date_field', ss.date_field,
'balance_offset', ss.balance_offset
) ORDER BY ss.source_name
) FILTER (WHERE ss.id IS NOT NULL), '[]')
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
WHERE s.name = p_name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: create_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION create_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT '[]',
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT 0
) RETURNS dataflow.stacks AS $$
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: update_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION update_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT NULL,
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT NULL
) RETURNS dataflow.stacks AS $$
UPDATE dataflow.stacks SET
label = COALESCE(p_label, label),
fields = COALESCE(p_fields, fields),
amount_field = COALESCE(p_amount_field, amount_field),
date_field = COALESCE(p_date_field, date_field),
balance_offset = COALESCE(p_balance_offset, balance_offset)
WHERE name = p_name
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: delete_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT)
RETURNS TABLE (name TEXT) AS $$
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: upsert_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION upsert_stack_source(
p_stack_name TEXT,
p_source_name TEXT,
p_field_map JSONB DEFAULT '{}',
p_amount_sign INTEGER DEFAULT 1,
p_balance_offset NUMERIC DEFAULT 0,
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL
) RETURNS dataflow.stack_sources AS $$
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field)
VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field)
ON CONFLICT (stack_name, source_name) DO UPDATE SET
field_map = EXCLUDED.field_map,
amount_sign = EXCLUDED.amount_sign,
balance_offset = EXCLUDED.balance_offset,
amount_field = EXCLUDED.amount_field,
date_field = EXCLUDED.date_field
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: remove_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT)
RETURNS TABLE (source_name TEXT) AS $$
DELETE FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name
RETURNING source_name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: calibrate_balance
-- Queries dfv.{source} directly using per-source amount/date fields.
-- No stack view required.
------------------------------------------------------
CREATE OR REPLACE FUNCTION calibrate_balance(
p_stack_name TEXT,
p_source_name TEXT,
p_as_of_date DATE,
p_known_balance NUMERIC
) RETURNS JSON AS $$
DECLARE
v_src dataflow.stack_sources%ROWTYPE;
v_running NUMERIC;
v_sql TEXT;
BEGIN
SELECT * INTO v_src
FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Source not in stack');
END IF;
IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
END IF;
BEGIN
v_sql := format(
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
);
EXECUTE v_sql INTO v_running;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
END;
RETURN json_build_object(
'success', true,
'source', p_source_name,
'as_of_date', p_as_of_date,
'known_balance', p_known_balance,
'computed_sum', v_running,
'suggested_offset', p_known_balance - v_running
);
END;
$$ LANGUAGE plpgsql STABLE;
------------------------------------------------------
-- Function: generate_stack_view
-- Builds a WITH ... UNION ALL view in dfv schema from existing dfv source views.
-- Each source CTE applies amount_sign and computes a per-source running balance.
-- Outer SELECT adds net_balance across all sources.
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_src dataflow.stack_sources%ROWTYPE;
v_field JSONB;
v_ctes TEXT[] := '{}';
v_cte_names TEXT[] := '{}';
v_select TEXT;
v_col TEXT;
v_src_field TEXT;
v_amt_src TEXT;
v_date_src TEXT;
v_view TEXT;
v_sql TEXT;
v_has_bal BOOLEAN;
v_canon_cols TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
-- Build one CTE per source querying dfv.{source} directly
FOR v_src IN
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name
LOOP
v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
LOOP
v_col := v_field->>'name';
IF v_has_bal AND v_col = v_stack.amount_field THEN
-- Use per-source amount_field with sign applied
IF v_src.amount_field IS NULL THEN
v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
ELSE
v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
END IF;
ELSIF v_has_bal AND v_col = v_stack.date_field THEN
-- Use per-source date_field
IF v_src.date_field IS NULL THEN
v_select := v_select || format(', NULL::date AS %I', v_col);
ELSE
v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
END IF;
ELSE
-- Other canonical fields: use field_map or same name, NULL if column doesn't exist
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = v_src.source_name
AND column_name = v_src_field
) THEN
v_select := v_select || format(', %I AS %I', v_src_field, v_col);
ELSE
v_select := v_select || format(', NULL::text AS %I', v_col);
END IF;
END IF;
END LOOP;
-- Per-source running balance with calibration offset baked in
IF v_has_bal AND v_src.amount_field IS NOT NULL AND v_src.date_field IS NOT NULL THEN
v_select := v_select || format(
', SUM(%I * %s) OVER (ORDER BY %I ASC, id ASC) + %s AS source_balance',
v_src.amount_field, v_src.amount_sign, v_src.date_field, v_src.balance_offset
);
ELSE
v_select := v_select || ', NULL::numeric AS source_balance';
END IF;
v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
v_cte_names := v_cte_names || quote_ident(v_src.source_name);
END LOOP;
IF array_length(v_ctes, 1) IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
END IF;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_stack_name);
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
v_canon_cols := (
SELECT string_agg(quote_ident(f->>'name'), ', ')
FROM jsonb_array_elements(v_stack.fields) f
);
IF v_has_bal THEN
-- net_balance: cumulative signed amount across all sources + stack-level offset
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s, source_balance, '
'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
'FROM _stacked',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols,
v_stack.amount_field,
v_stack.date_field,
v_stack.balance_offset
);
ELSE
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s FROM _stacked',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols
);
END IF;
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;
------------------------------------------------------
-- Function: get_stack_balance
-- Returns the current running balance (last row of the generated view)
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack_balance(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_balance NUMERIC;
v_view TEXT;
v_sql TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
BEGIN
v_sql := format(
'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
v_view, v_stack.date_field
);
EXECUTE v_sql INTO v_balance;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
END;
RETURN json_build_object('success', true, 'balance', v_balance);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION generate_stack_view IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance';
COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';
COMMENT ON FUNCTION get_stack_balance IS 'Return the current running balance (last row) from the generated dfv view';