dataflow/database/queries/stacks.sql
Paul Trowbridge 9e6d184bd8 Stacks: source ordering via seq field with drag-to-reorder
- Add seq column to stack_sources; existing rows seeded by insertion order
- New sources auto-assigned max(seq)+1 so they always append to the end
- get_stack and generate_stack_view now order by seq instead of source_name
- Add reorder_stack_sources() function and PUT /:name/sources/reorder endpoint
- Source cards have drag handles matching the output columns grid behavior

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 17:11:44 -04:00

473 lines
18 KiB
PL/PgSQL

--
-- Stacks queries
-- All SQL for api/routes/stacks.js
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Tables
------------------------------------------------------
CREATE TABLE IF NOT EXISTS dataflow.stacks (
name TEXT PRIMARY KEY,
label TEXT,
-- Ordered canonical field definitions: [{name, label, type}]
-- type: 'text' | 'numeric' | 'date'
fields JSONB NOT NULL DEFAULT '[]',
-- Running balance config
amount_field TEXT, -- canonical field to sum for running balance
date_field TEXT, -- canonical field to order by
balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration)
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
id SERIAL PRIMARY KEY,
stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
-- Maps other canonical field names → source view column names (not amount/date — those are explicit)
field_map JSONB NOT NULL DEFAULT '{}',
-- Which column in dfv.{source} is the amount, and its sign (+1/-1)
amount_field TEXT,
amount_sign INTEGER NOT NULL DEFAULT 1,
-- Which column in dfv.{source} is the date
date_field TEXT,
-- Calibration offset added to this source's running balance
balance_offset NUMERIC NOT NULL DEFAULT 0,
UNIQUE (stack_name, source_name)
);
-- Migrations: add columns that may be missing from earlier deploys
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0;
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS amount_field TEXT;
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS date_field TEXT;
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS seq INTEGER NOT NULL DEFAULT 0;
-- Seed seq from insertion order for existing rows
UPDATE dataflow.stack_sources ss
SET seq = sub.rn
FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY stack_name ORDER BY id) AS rn
FROM dataflow.stack_sources WHERE seq = 0
) sub
WHERE ss.id = sub.id AND ss.seq = 0;
-- Drop old signatures before recreating
DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC);
DROP FUNCTION IF EXISTS upsert_stack_source(TEXT, TEXT, JSONB, INTEGER, NUMERIC);
DROP FUNCTION IF EXISTS generate_stack_view(TEXT);
------------------------------------------------------
-- Function: list_stacks
------------------------------------------------------
CREATE OR REPLACE FUNCTION list_stacks()
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
source_count BIGINT,
created_at TIMESTAMPTZ
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
count(ss.id) AS source_count,
s.created_at
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
ORDER BY s.name;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: get_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack(p_name TEXT)
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
created_at TIMESTAMPTZ,
sources JSONB
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
s.created_at,
COALESCE(jsonb_agg(
jsonb_build_object(
'id', ss.id,
'source_name', ss.source_name,
'field_map', ss.field_map,
'amount_field', ss.amount_field,
'amount_sign', ss.amount_sign,
'date_field', ss.date_field,
'balance_offset', ss.balance_offset,
'seq', ss.seq
) ORDER BY ss.seq, ss.id
) FILTER (WHERE ss.id IS NOT NULL), '[]')
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
WHERE s.name = p_name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: create_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION create_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT '[]',
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT 0
) RETURNS dataflow.stacks AS $$
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: update_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION update_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT NULL,
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT NULL
) RETURNS dataflow.stacks AS $$
UPDATE dataflow.stacks SET
label = COALESCE(p_label, label),
fields = COALESCE(p_fields, fields),
amount_field = COALESCE(p_amount_field, amount_field),
date_field = COALESCE(p_date_field, date_field),
balance_offset = COALESCE(p_balance_offset, balance_offset)
WHERE name = p_name
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: delete_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT)
RETURNS TABLE (name TEXT) AS $$
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: upsert_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION upsert_stack_source(
p_stack_name TEXT,
p_source_name TEXT,
p_field_map JSONB DEFAULT '{}',
p_amount_sign INTEGER DEFAULT 1,
p_balance_offset NUMERIC DEFAULT 0,
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL
) RETURNS dataflow.stack_sources AS $$
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq)
VALUES (
p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field,
(SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name)
)
ON CONFLICT (stack_name, source_name) DO UPDATE SET
field_map = EXCLUDED.field_map,
amount_sign = EXCLUDED.amount_sign,
balance_offset = EXCLUDED.balance_offset,
amount_field = EXCLUDED.amount_field,
date_field = EXCLUDED.date_field
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: remove_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT)
RETURNS TABLE (source_name TEXT) AS $$
DELETE FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name
RETURNING source_name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: calibrate_balance
-- Queries dfv.{source} directly using per-source amount/date fields.
-- No stack view required.
------------------------------------------------------
CREATE OR REPLACE FUNCTION calibrate_balance(
p_stack_name TEXT,
p_source_name TEXT,
p_as_of_date DATE,
p_known_balance NUMERIC
) RETURNS JSON AS $$
DECLARE
v_src dataflow.stack_sources%ROWTYPE;
v_running NUMERIC;
v_sql TEXT;
BEGIN
SELECT * INTO v_src
FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Source not in stack');
END IF;
IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
END IF;
BEGIN
IF p_as_of_date IS NULL THEN
v_sql := format(
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I',
v_src.amount_field, v_src.amount_sign, p_source_name
);
ELSE
v_sql := format(
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
);
END IF;
EXECUTE v_sql INTO v_running;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
END;
RETURN json_build_object(
'success', true,
'source', p_source_name,
'as_of_date', p_as_of_date,
'known_balance', p_known_balance,
'computed_sum', v_running,
'suggested_offset', p_known_balance - v_running
);
END;
$$ LANGUAGE plpgsql STABLE;
------------------------------------------------------
-- Function: generate_stack_view
-- Builds a WITH ... UNION ALL view in dfv schema from existing dfv source views.
-- Each source CTE applies amount_sign and computes a per-source running balance.
-- Outer SELECT adds net_balance across all sources.
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT, p_dry_run BOOLEAN DEFAULT false)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_src dataflow.stack_sources%ROWTYPE;
v_field JSONB;
v_ctes TEXT[] := '{}';
v_cte_names TEXT[] := '{}';
v_select TEXT;
v_col TEXT;
v_src_field TEXT;
v_amt_src TEXT;
v_date_src TEXT;
v_view TEXT;
v_sql TEXT;
v_has_bal BOOLEAN;
v_canon_cols TEXT;
v_src_bal_cols TEXT;
v_total_offset NUMERIC := 0;
v_cascade_stale TEXT[];
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
-- Build one CTE per source querying dfv.{source} directly
FOR v_src IN
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id
LOOP
v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
LOOP
v_col := v_field->>'name';
IF v_has_bal AND v_col = v_stack.amount_field THEN
-- Use per-source amount_field with sign applied
IF v_src.amount_field IS NULL THEN
v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
ELSE
v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
END IF;
ELSIF v_has_bal AND v_col = v_stack.date_field THEN
-- Use per-source date_field
IF v_src.date_field IS NULL THEN
v_select := v_select || format(', NULL::date AS %I', v_col);
ELSE
v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
END IF;
ELSE
-- Other canonical fields: use field_map or same name, NULL if column doesn't exist
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = v_src.source_name
AND column_name = v_src_field
) THEN
v_select := v_select || format(', %I AS %I', v_src_field, v_col);
ELSE
v_select := v_select || format(', NULL::text AS %I', v_col);
END IF;
END IF;
END LOOP;
v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
v_cte_names := v_cte_names || quote_ident(v_src.source_name);
-- Accumulate carried-forward source balance column and total offset
IF v_has_bal THEN
IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF;
v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format(
'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I',
v_src.source_name, v_stack.amount_field, v_stack.date_field,
v_src.balance_offset, v_src.source_name || '_balance'
);
v_total_offset := v_total_offset + v_src.balance_offset;
END IF;
END LOOP;
IF array_length(v_ctes, 1) IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
v_canon_cols := (
SELECT string_agg(quote_ident(f->>'name'), ', ')
FROM jsonb_array_elements(v_stack.fields) f
);
IF v_has_bal THEN
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s, '
'%s, '
'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
'FROM _stacked ORDER BY %I DESC, _id DESC',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols,
v_src_bal_cols,
v_stack.amount_field,
v_stack.date_field,
v_total_offset,
v_stack.date_field
);
ELSE
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s FROM _stacked',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols
);
END IF;
IF NOT p_dry_run THEN
CREATE SCHEMA IF NOT EXISTS dfv;
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
EXECUTE v_sql;
-- Detect stacks whose views were dropped by CASCADE and mark them stale
SELECT array_agg(s.name) INTO v_cascade_stale
FROM dataflow.stacks s
WHERE s.name != p_stack_name
AND s.view_generated_at IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM pg_views v
WHERE v.schemaname = 'dfv' AND v.viewname = s.name
);
UPDATE dataflow.stacks SET view_generated_at = NULL
WHERE name = ANY(v_cascade_stale);
END IF;
RETURN json_build_object(
'success', true,
'view', v_view,
'sql', v_sql,
'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json)
);
END;
$$ LANGUAGE plpgsql;
------------------------------------------------------
-- Function: get_stack_balance
-- Returns the current running balance (last row of the generated view)
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack_balance(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_balance NUMERIC;
v_view TEXT;
v_sql TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
BEGIN
v_sql := format(
'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
v_view, v_stack.date_field
);
EXECUTE v_sql INTO v_balance;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
END;
RETURN json_build_object('success', true, 'balance', v_balance);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION generate_stack_view(TEXT, BOOLEAN) IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance; p_dry_run=true returns SQL without executing';
COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';
COMMENT ON FUNCTION get_stack_balance IS 'Return the current running balance (last row) from the generated dfv view';
------------------------------------------------------
-- Function: reorder_stack_sources
------------------------------------------------------
CREATE OR REPLACE FUNCTION reorder_stack_sources(p_stack_name TEXT, p_source_names TEXT[])
RETURNS VOID AS $$
DECLARE
i INTEGER;
BEGIN
FOR i IN 1..array_length(p_source_names, 1) LOOP
UPDATE dataflow.stack_sources
SET seq = i
WHERE stack_name = p_stack_name AND source_name = p_source_names[i];
END LOOP;
END;
$$ LANGUAGE plpgsql;