- Two-column layout: config left, SQL panel right (equal halves) - SQL panel shows formatted SQL (sql-formatter, 4-space indent) - Live preview: SQL updates 400ms after any field/source/mapping change - Run button executes edited SQL directly via new exec-sql endpoint - generate_stack_view gains p_dry_run mode for preview without executing - CASCADE drop detects dependent stacks, marks them stale in DB and status bar - net_balance moved to last column in generated view - Backfill 458 missing dcard rows and 123 missing chase rows from TPS migration bug Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
436 lines
17 KiB
PL/PgSQL
436 lines
17 KiB
PL/PgSQL
--
|
|
-- Stacks queries
|
|
-- All SQL for api/routes/stacks.js
|
|
--
|
|
|
|
SET search_path TO dataflow, public;
|
|
|
|
------------------------------------------------------
|
|
-- Tables
|
|
------------------------------------------------------
|
|
|
|
CREATE TABLE IF NOT EXISTS dataflow.stacks (
|
|
name TEXT PRIMARY KEY,
|
|
label TEXT,
|
|
-- Ordered canonical field definitions: [{name, label, type}]
|
|
-- type: 'text' | 'numeric' | 'date'
|
|
fields JSONB NOT NULL DEFAULT '[]',
|
|
-- Running balance config
|
|
amount_field TEXT, -- canonical field to sum for running balance
|
|
date_field TEXT, -- canonical field to order by
|
|
balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration)
|
|
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
|
|
id SERIAL PRIMARY KEY,
|
|
stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
|
|
source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
|
|
-- Maps other canonical field names → source view column names (not amount/date — those are explicit)
|
|
field_map JSONB NOT NULL DEFAULT '{}',
|
|
-- Which column in dfv.{source} is the amount, and its sign (+1/-1)
|
|
amount_field TEXT,
|
|
amount_sign INTEGER NOT NULL DEFAULT 1,
|
|
-- Which column in dfv.{source} is the date
|
|
date_field TEXT,
|
|
-- Calibration offset added to this source's running balance
|
|
balance_offset NUMERIC NOT NULL DEFAULT 0,
|
|
UNIQUE (stack_name, source_name)
|
|
);
|
|
|
|
-- Migrations: add columns that may be missing from earlier deploys
|
|
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0;
|
|
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS amount_field TEXT;
|
|
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS date_field TEXT;
|
|
|
|
-- Drop old signatures before recreating
|
|
DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC);
|
|
DROP FUNCTION IF EXISTS upsert_stack_source(TEXT, TEXT, JSONB, INTEGER, NUMERIC);
|
|
DROP FUNCTION IF EXISTS generate_stack_view(TEXT);
|
|
|
|
------------------------------------------------------
|
|
-- Function: list_stacks
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION list_stacks()
|
|
RETURNS TABLE (
|
|
name TEXT,
|
|
label TEXT,
|
|
fields JSONB,
|
|
amount_field TEXT,
|
|
date_field TEXT,
|
|
balance_offset NUMERIC,
|
|
source_count BIGINT,
|
|
created_at TIMESTAMPTZ
|
|
) AS $$
|
|
SELECT
|
|
s.name, s.label, s.fields,
|
|
s.amount_field, s.date_field, s.balance_offset,
|
|
count(ss.id) AS source_count,
|
|
s.created_at
|
|
FROM dataflow.stacks s
|
|
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
|
|
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
|
|
ORDER BY s.name;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_stack
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_stack(p_name TEXT)
|
|
RETURNS TABLE (
|
|
name TEXT,
|
|
label TEXT,
|
|
fields JSONB,
|
|
amount_field TEXT,
|
|
date_field TEXT,
|
|
balance_offset NUMERIC,
|
|
created_at TIMESTAMPTZ,
|
|
sources JSONB
|
|
) AS $$
|
|
SELECT
|
|
s.name, s.label, s.fields,
|
|
s.amount_field, s.date_field, s.balance_offset,
|
|
s.created_at,
|
|
COALESCE(jsonb_agg(
|
|
jsonb_build_object(
|
|
'id', ss.id,
|
|
'source_name', ss.source_name,
|
|
'field_map', ss.field_map,
|
|
'amount_field', ss.amount_field,
|
|
'amount_sign', ss.amount_sign,
|
|
'date_field', ss.date_field,
|
|
'balance_offset', ss.balance_offset
|
|
) ORDER BY ss.source_name
|
|
) FILTER (WHERE ss.id IS NOT NULL), '[]')
|
|
FROM dataflow.stacks s
|
|
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
|
|
WHERE s.name = p_name
|
|
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
------------------------------------------------------
|
|
-- Function: create_stack
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION create_stack(
|
|
p_name TEXT,
|
|
p_label TEXT DEFAULT NULL,
|
|
p_fields JSONB DEFAULT '[]',
|
|
p_amount_field TEXT DEFAULT NULL,
|
|
p_date_field TEXT DEFAULT NULL,
|
|
p_balance_offset NUMERIC DEFAULT 0
|
|
) RETURNS dataflow.stacks AS $$
|
|
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
|
|
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
|
|
RETURNING *;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: update_stack
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION update_stack(
|
|
p_name TEXT,
|
|
p_label TEXT DEFAULT NULL,
|
|
p_fields JSONB DEFAULT NULL,
|
|
p_amount_field TEXT DEFAULT NULL,
|
|
p_date_field TEXT DEFAULT NULL,
|
|
p_balance_offset NUMERIC DEFAULT NULL
|
|
) RETURNS dataflow.stacks AS $$
|
|
UPDATE dataflow.stacks SET
|
|
label = COALESCE(p_label, label),
|
|
fields = COALESCE(p_fields, fields),
|
|
amount_field = COALESCE(p_amount_field, amount_field),
|
|
date_field = COALESCE(p_date_field, date_field),
|
|
balance_offset = COALESCE(p_balance_offset, balance_offset)
|
|
WHERE name = p_name
|
|
RETURNING *;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: delete_stack
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT)
|
|
RETURNS TABLE (name TEXT) AS $$
|
|
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: upsert_stack_source
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION upsert_stack_source(
|
|
p_stack_name TEXT,
|
|
p_source_name TEXT,
|
|
p_field_map JSONB DEFAULT '{}',
|
|
p_amount_sign INTEGER DEFAULT 1,
|
|
p_balance_offset NUMERIC DEFAULT 0,
|
|
p_amount_field TEXT DEFAULT NULL,
|
|
p_date_field TEXT DEFAULT NULL
|
|
) RETURNS dataflow.stack_sources AS $$
|
|
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field)
|
|
VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field)
|
|
ON CONFLICT (stack_name, source_name) DO UPDATE SET
|
|
field_map = EXCLUDED.field_map,
|
|
amount_sign = EXCLUDED.amount_sign,
|
|
balance_offset = EXCLUDED.balance_offset,
|
|
amount_field = EXCLUDED.amount_field,
|
|
date_field = EXCLUDED.date_field
|
|
RETURNING *;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: remove_stack_source
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT)
|
|
RETURNS TABLE (source_name TEXT) AS $$
|
|
DELETE FROM dataflow.stack_sources
|
|
WHERE stack_name = p_stack_name AND source_name = p_source_name
|
|
RETURNING source_name;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: calibrate_balance
|
|
-- Queries dfv.{source} directly using per-source amount/date fields.
|
|
-- No stack view required.
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION calibrate_balance(
|
|
p_stack_name TEXT,
|
|
p_source_name TEXT,
|
|
p_as_of_date DATE,
|
|
p_known_balance NUMERIC
|
|
) RETURNS JSON AS $$
|
|
DECLARE
|
|
v_src dataflow.stack_sources%ROWTYPE;
|
|
v_running NUMERIC;
|
|
v_sql TEXT;
|
|
BEGIN
|
|
SELECT * INTO v_src
|
|
FROM dataflow.stack_sources
|
|
WHERE stack_name = p_stack_name AND source_name = p_source_name;
|
|
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Source not in stack');
|
|
END IF;
|
|
IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
|
|
END IF;
|
|
|
|
BEGIN
|
|
v_sql := format(
|
|
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
|
|
v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
|
|
);
|
|
EXECUTE v_sql INTO v_running;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
|
|
END;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'source', p_source_name,
|
|
'as_of_date', p_as_of_date,
|
|
'known_balance', p_known_balance,
|
|
'computed_sum', v_running,
|
|
'suggested_offset', p_known_balance - v_running
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql STABLE;
|
|
|
|
------------------------------------------------------
|
|
-- Function: generate_stack_view
|
|
-- Builds a WITH ... UNION ALL view in dfv schema from existing dfv source views.
|
|
-- Each source CTE applies amount_sign and computes a per-source running balance.
|
|
-- Outer SELECT adds net_balance across all sources.
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT, p_dry_run BOOLEAN DEFAULT false)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_stack dataflow.stacks%ROWTYPE;
|
|
v_src dataflow.stack_sources%ROWTYPE;
|
|
v_field JSONB;
|
|
v_ctes TEXT[] := '{}';
|
|
v_cte_names TEXT[] := '{}';
|
|
v_select TEXT;
|
|
v_col TEXT;
|
|
v_src_field TEXT;
|
|
v_amt_src TEXT;
|
|
v_date_src TEXT;
|
|
v_view TEXT;
|
|
v_sql TEXT;
|
|
v_has_bal BOOLEAN;
|
|
v_canon_cols TEXT;
|
|
v_src_bal_cols TEXT;
|
|
v_total_offset NUMERIC := 0;
|
|
v_cascade_stale TEXT[];
|
|
BEGIN
|
|
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack not found');
|
|
END IF;
|
|
|
|
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
|
|
|
|
-- Build one CTE per source querying dfv.{source} directly
|
|
FOR v_src IN
|
|
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name
|
|
LOOP
|
|
v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
|
|
|
|
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
|
|
LOOP
|
|
v_col := v_field->>'name';
|
|
|
|
IF v_has_bal AND v_col = v_stack.amount_field THEN
|
|
-- Use per-source amount_field with sign applied
|
|
IF v_src.amount_field IS NULL THEN
|
|
v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
|
|
ELSE
|
|
v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
|
|
END IF;
|
|
ELSIF v_has_bal AND v_col = v_stack.date_field THEN
|
|
-- Use per-source date_field
|
|
IF v_src.date_field IS NULL THEN
|
|
v_select := v_select || format(', NULL::date AS %I', v_col);
|
|
ELSE
|
|
v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
|
|
END IF;
|
|
ELSE
|
|
-- Other canonical fields: use field_map or same name, NULL if column doesn't exist
|
|
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
|
|
IF EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = v_src.source_name
|
|
AND column_name = v_src_field
|
|
) THEN
|
|
v_select := v_select || format(', %I AS %I', v_src_field, v_col);
|
|
ELSE
|
|
v_select := v_select || format(', NULL::text AS %I', v_col);
|
|
END IF;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
|
|
|
|
v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
|
|
v_cte_names := v_cte_names || quote_ident(v_src.source_name);
|
|
|
|
-- Accumulate carried-forward source balance column and total offset
|
|
IF v_has_bal THEN
|
|
IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF;
|
|
v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format(
|
|
'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I',
|
|
v_src.source_name, v_stack.amount_field, v_stack.date_field,
|
|
v_src.balance_offset, v_src.source_name || '_balance'
|
|
);
|
|
v_total_offset := v_total_offset + v_src.balance_offset;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
IF array_length(v_ctes, 1) IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
|
|
END IF;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_stack_name);
|
|
|
|
v_canon_cols := (
|
|
SELECT string_agg(quote_ident(f->>'name'), ', ')
|
|
FROM jsonb_array_elements(v_stack.fields) f
|
|
);
|
|
|
|
IF v_has_bal THEN
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS '
|
|
'WITH %s, _stacked AS (SELECT * FROM %s) '
|
|
'SELECT _source, _id, %s, '
|
|
'%s, '
|
|
'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
|
|
'FROM _stacked ORDER BY %I DESC, _id DESC',
|
|
v_view,
|
|
array_to_string(v_ctes, ', '),
|
|
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
|
|
v_canon_cols,
|
|
v_src_bal_cols,
|
|
v_stack.amount_field,
|
|
v_stack.date_field,
|
|
v_total_offset,
|
|
v_stack.date_field
|
|
);
|
|
ELSE
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS '
|
|
'WITH %s, _stacked AS (SELECT * FROM %s) '
|
|
'SELECT _source, _id, %s FROM _stacked',
|
|
v_view,
|
|
array_to_string(v_ctes, ', '),
|
|
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
|
|
v_canon_cols
|
|
);
|
|
END IF;
|
|
|
|
IF NOT p_dry_run THEN
|
|
CREATE SCHEMA IF NOT EXISTS dfv;
|
|
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
|
|
EXECUTE v_sql;
|
|
|
|
-- Detect stacks whose views were dropped by CASCADE and mark them stale
|
|
SELECT array_agg(s.name) INTO v_cascade_stale
|
|
FROM dataflow.stacks s
|
|
WHERE s.name != p_stack_name
|
|
AND s.view_generated_at IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM pg_views v
|
|
WHERE v.schemaname = 'dfv' AND v.viewname = s.name
|
|
);
|
|
|
|
UPDATE dataflow.stacks SET view_generated_at = NULL
|
|
WHERE name = ANY(v_cascade_stale);
|
|
END IF;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'view', v_view,
|
|
'sql', v_sql,
|
|
'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json)
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_stack_balance
|
|
-- Returns the current running balance (last row of the generated view)
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_stack_balance(p_stack_name TEXT)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_stack dataflow.stacks%ROWTYPE;
|
|
v_balance NUMERIC;
|
|
v_view TEXT;
|
|
v_sql TEXT;
|
|
BEGIN
|
|
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack not found');
|
|
END IF;
|
|
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
|
|
END IF;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_stack_name);
|
|
|
|
BEGIN
|
|
v_sql := format(
|
|
'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
|
|
v_view, v_stack.date_field
|
|
);
|
|
EXECUTE v_sql INTO v_balance;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
|
|
END;
|
|
|
|
RETURN json_build_object('success', true, 'balance', v_balance);
|
|
END;
|
|
$$ LANGUAGE plpgsql STABLE;
|
|
|
|
COMMENT ON FUNCTION generate_stack_view(TEXT, BOOLEAN) IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance; p_dry_run=true returns SQL without executing';
|
|
COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';
|
|
COMMENT ON FUNCTION get_stack_balance IS 'Return the current running balance (last row) from the generated dfv view';
|