dataflow/database/queries/stacks.sql
2026-04-18 16:20:49 -04:00

379 lines
15 KiB
PL/PgSQL

--
-- Stacks queries
-- All SQL for api/routes/stacks.js
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Tables
------------------------------------------------------
CREATE TABLE IF NOT EXISTS dataflow.stacks (
name TEXT PRIMARY KEY,
label TEXT,
-- Ordered canonical field definitions: [{name, label, type}]
-- type: 'text' | 'numeric' | 'date'
fields JSONB NOT NULL DEFAULT '[]',
-- Running balance config
amount_field TEXT, -- canonical field to sum for running balance
date_field TEXT, -- canonical field to order by
balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration)
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS dataflow.stack_sources (
id SERIAL PRIMARY KEY,
stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE,
source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE,
-- Maps canonical field name → field name in records.transformed
field_map JSONB NOT NULL DEFAULT '{}',
-- Multiply amount by this before summing (1 = as-is, -1 = flip sign)
amount_sign INTEGER NOT NULL DEFAULT 1,
-- Seed balance for this source — added as a constant to the combined running total
balance_offset NUMERIC NOT NULL DEFAULT 0,
UNIQUE (stack_name, source_name)
);
-- Migrations: add columns that may be missing from earlier deploys
ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0;
-- Drop old 3-arg calibrate_balance signature if it exists before recreating with 4 args
DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC);
------------------------------------------------------
-- Function: list_stacks
------------------------------------------------------
CREATE OR REPLACE FUNCTION list_stacks()
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
source_count BIGINT,
created_at TIMESTAMPTZ
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
count(ss.id) AS source_count,
s.created_at
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
ORDER BY s.name;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: get_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack(p_name TEXT)
RETURNS TABLE (
name TEXT,
label TEXT,
fields JSONB,
amount_field TEXT,
date_field TEXT,
balance_offset NUMERIC,
created_at TIMESTAMPTZ,
sources JSONB
) AS $$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
s.created_at,
COALESCE(jsonb_agg(
jsonb_build_object(
'id', ss.id,
'source_name', ss.source_name,
'field_map', ss.field_map,
'amount_sign', ss.amount_sign,
'balance_offset', ss.balance_offset
) ORDER BY ss.source_name
) FILTER (WHERE ss.id IS NOT NULL), '[]')
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
WHERE s.name = p_name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
$$ LANGUAGE sql STABLE;
------------------------------------------------------
-- Function: create_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION create_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT '[]',
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT 0
) RETURNS dataflow.stacks AS $$
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: update_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION update_stack(
p_name TEXT,
p_label TEXT DEFAULT NULL,
p_fields JSONB DEFAULT NULL,
p_amount_field TEXT DEFAULT NULL,
p_date_field TEXT DEFAULT NULL,
p_balance_offset NUMERIC DEFAULT NULL
) RETURNS dataflow.stacks AS $$
UPDATE dataflow.stacks SET
label = COALESCE(p_label, label),
fields = COALESCE(p_fields, fields),
amount_field = COALESCE(p_amount_field, amount_field),
date_field = COALESCE(p_date_field, date_field),
balance_offset = COALESCE(p_balance_offset, balance_offset)
WHERE name = p_name
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: delete_stack
------------------------------------------------------
CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT)
RETURNS TABLE (name TEXT) AS $$
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: upsert_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION upsert_stack_source(
p_stack_name TEXT,
p_source_name TEXT,
p_field_map JSONB DEFAULT '{}',
p_amount_sign INTEGER DEFAULT 1,
p_balance_offset NUMERIC DEFAULT 0
) RETURNS dataflow.stack_sources AS $$
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset)
VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset)
ON CONFLICT (stack_name, source_name) DO UPDATE SET
field_map = EXCLUDED.field_map,
amount_sign = EXCLUDED.amount_sign,
balance_offset = EXCLUDED.balance_offset
RETURNING *;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: remove_stack_source
------------------------------------------------------
CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT)
RETURNS TABLE (source_name TEXT) AS $$
DELETE FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name
RETURNING source_name;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: calibrate_balance
-- Given a known good balance at a specific date, compute the offset needed.
-- Returns: {computed_at_date, known_balance, suggested_offset}
------------------------------------------------------
CREATE OR REPLACE FUNCTION calibrate_balance(
p_stack_name TEXT,
p_source_name TEXT, -- specific source to calibrate, or NULL for combined
p_as_of_date DATE,
p_known_balance NUMERIC
) RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_running NUMERIC := 0;
v_other_offsets NUMERIC := 0;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack must have amount_field and date_field set');
END IF;
-- Sum amount * sign for the target source(s) up to as_of_date
SELECT COALESCE(SUM(
(rec.transformed ->> (ss.field_map ->> v_stack.amount_field))::numeric
* ss.amount_sign
), 0)
INTO v_running
FROM dataflow.stack_sources ss
JOIN dataflow.records rec ON rec.source_name = ss.source_name
WHERE ss.stack_name = p_stack_name
AND (p_source_name IS NULL OR ss.source_name = p_source_name)
AND rec.transformed IS NOT NULL
AND (rec.transformed ->> (ss.field_map ->> v_stack.date_field))::date <= p_as_of_date;
-- For combined calibration, include existing offsets from other sources
IF p_source_name IS NOT NULL THEN
SELECT COALESCE(SUM(balance_offset), 0) INTO v_other_offsets
FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name != p_source_name;
END IF;
RETURN json_build_object(
'success', true,
'source', p_source_name,
'as_of_date', p_as_of_date,
'known_balance', p_known_balance,
'computed_sum', v_running,
'other_offsets', v_other_offsets,
'suggested_offset', p_known_balance - v_running
);
END;
$$ LANGUAGE plpgsql STABLE;
------------------------------------------------------
-- Function: generate_stack_view
-- Builds a UNION ALL view in dfv schema from all member sources.
-- Includes running_balance if amount_field and date_field are set.
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_src dataflow.stack_sources%ROWTYPE;
v_field JSONB;
v_parts TEXT[] := '{}';
v_select TEXT;
v_col TEXT;
v_src_field TEXT;
v_view TEXT;
v_sql TEXT;
v_has_bal BOOLEAN;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
-- Build one SELECT per source
FOR v_src IN
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name
LOOP
v_select := format('SELECT %L AS _source, rec.id AS _id', v_src.source_name);
-- Canonical fields, mapped to source field names
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
LOOP
v_col := v_field->>'name';
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
v_select := v_select || ', ' || CASE v_field->>'type'
WHEN 'numeric' THEN
format('(rec.transformed->>%L)::numeric AS %I', v_src_field, v_col)
WHEN 'date' THEN
format('(rec.transformed->>%L)::date AS %I', v_src_field, v_col)
ELSE
format('rec.transformed->>%L AS %I', v_src_field, v_col)
END;
END LOOP;
-- amount_sign column for running balance
v_select := v_select || format(', %s::integer AS _sign, %s::numeric AS _src_offset', v_src.amount_sign, v_src.balance_offset);
v_select := v_select || format(
' FROM dataflow.records rec WHERE rec.source_name = %L AND rec.transformed IS NOT NULL',
v_src.source_name
);
v_parts := v_parts || v_select;
END LOOP;
IF array_length(v_parts, 1) IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
END IF;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_stack_name);
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
-- Wrap in outer SELECT that adds running_balance if configured
IF v_has_bal THEN
-- running_balance = cumulative sum of (amount * sign) + per-source seed offsets + stack-level offset
-- Each row's _src_offset is that source's balance_offset, summed cumulatively so it's added once
-- per source in the window rather than per row. We use a trick: sum(_src_offset / count_of_source_rows)
-- is complex, so instead we add SUM(DISTINCT _src_offset per _source) as a constant via subquery.
-- Simpler: treat each source offset as a lump added to its first row only via ROW_NUMBER trick.
-- Cleanest: add total of all source offsets as a constant (valid when each source is calibrated
-- relative to its own transaction history, not to each other).
v_sql := format(
'CREATE VIEW %s AS '
'SELECT _source, _id, %s, '
'SUM((%I)::numeric * _sign) OVER (ORDER BY %I ASC, _id ASC) '
'+ (SELECT COALESCE(SUM(balance_offset),0) FROM dataflow.stack_sources WHERE stack_name = %L) '
'+ %s AS running_balance '
'FROM (%s) _stacked',
v_view,
(SELECT string_agg(quote_ident(f->>'name'), ', ')
FROM jsonb_array_elements(v_stack.fields) f),
v_stack.amount_field,
v_stack.date_field,
p_stack_name,
v_stack.balance_offset,
array_to_string(v_parts, ' UNION ALL ')
);
ELSE
v_sql := format(
'CREATE VIEW %s AS SELECT _source, _id, %s FROM (%s) _stacked',
v_view,
(SELECT string_agg(quote_ident(f->>'name'), ', ')
FROM jsonb_array_elements(v_stack.fields) f),
array_to_string(v_parts, ' UNION ALL ')
);
END IF;
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;
------------------------------------------------------
-- Function: get_stack_balance
-- Returns the current running balance (last row of the generated view)
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_stack_balance(p_stack_name TEXT)
RETURNS JSON AS $$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_balance NUMERIC;
v_view TEXT;
v_sql TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
BEGIN
v_sql := format(
'SELECT running_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
v_view, v_stack.date_field
);
EXECUTE v_sql INTO v_balance;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
END;
RETURN json_build_object('success', true, 'balance', v_balance);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION generate_stack_view IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance';
COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';
COMMENT ON FUNCTION get_stack_balance IS 'Return the current running balance (last row) from the generated dfv view';