-- -- Stacks queries -- All SQL for api/routes/stacks.js -- SET search_path TO dataflow, public; ------------------------------------------------------ -- Tables ------------------------------------------------------ CREATE TABLE IF NOT EXISTS dataflow.stacks ( name TEXT PRIMARY KEY, label TEXT, -- Ordered canonical field definitions: [{name, label, type}] -- type: 'text' | 'numeric' | 'date' fields JSONB NOT NULL DEFAULT '[]', -- Running balance config amount_field TEXT, -- canonical field to sum for running balance date_field TEXT, -- canonical field to order by balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration) created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS dataflow.stack_sources ( id SERIAL PRIMARY KEY, stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE, source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE, -- Maps other canonical field names → source view column names (not amount/date — those are explicit) field_map JSONB NOT NULL DEFAULT '{}', -- Which column in dfv.{source} is the amount, and its sign (+1/-1) amount_field TEXT, amount_sign INTEGER NOT NULL DEFAULT 1, -- Which column in dfv.{source} is the date date_field TEXT, -- Calibration offset added to this source's running balance balance_offset NUMERIC NOT NULL DEFAULT 0, UNIQUE (stack_name, source_name) ); -- Migrations: add columns that may be missing from earlier deploys ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS balance_offset NUMERIC NOT NULL DEFAULT 0; ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS amount_field TEXT; ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS date_field TEXT; ALTER TABLE dataflow.stack_sources ADD COLUMN IF NOT EXISTS seq INTEGER NOT NULL DEFAULT 0; -- Seed seq from insertion order for existing rows UPDATE dataflow.stack_sources ss SET seq = sub.rn FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY stack_name ORDER BY id) AS rn FROM dataflow.stack_sources WHERE seq = 0 ) sub WHERE ss.id = sub.id AND ss.seq = 0; -- Drop old signatures before recreating DROP FUNCTION IF EXISTS calibrate_balance(TEXT, DATE, NUMERIC); DROP FUNCTION IF EXISTS upsert_stack_source(TEXT, TEXT, JSONB, INTEGER, NUMERIC); DROP FUNCTION IF EXISTS generate_stack_view(TEXT); ------------------------------------------------------ -- Function: list_stacks ------------------------------------------------------ CREATE OR REPLACE FUNCTION list_stacks() RETURNS TABLE ( name TEXT, label TEXT, fields JSONB, amount_field TEXT, date_field TEXT, balance_offset NUMERIC, source_count BIGINT, created_at TIMESTAMPTZ ) AS $$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, count(ss.id) AS source_count, s.created_at FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at ORDER BY s.name; $$ LANGUAGE sql STABLE; ------------------------------------------------------ -- Function: get_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_stack(p_name TEXT) RETURNS TABLE ( name TEXT, label TEXT, fields JSONB, amount_field TEXT, date_field TEXT, balance_offset NUMERIC, created_at TIMESTAMPTZ, sources JSONB ) AS $$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at, COALESCE(jsonb_agg( jsonb_build_object( 'id', ss.id, 'source_name', ss.source_name, 'field_map', ss.field_map, 'amount_field', ss.amount_field, 'amount_sign', ss.amount_sign, 'date_field', ss.date_field, 'balance_offset', ss.balance_offset, 'seq', ss.seq ) ORDER BY ss.seq, ss.id ) FILTER (WHERE ss.id IS NOT NULL), '[]') FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name WHERE s.name = p_name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at; $$ LANGUAGE sql STABLE; ------------------------------------------------------ -- Function: create_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION create_stack( p_name TEXT, p_label TEXT DEFAULT NULL, p_fields JSONB DEFAULT '[]', p_amount_field TEXT DEFAULT NULL, p_date_field TEXT DEFAULT NULL, p_balance_offset NUMERIC DEFAULT 0 ) RETURNS dataflow.stacks AS $$ INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset) VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset) RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: update_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION update_stack( p_name TEXT, p_label TEXT DEFAULT NULL, p_fields JSONB DEFAULT NULL, p_amount_field TEXT DEFAULT NULL, p_date_field TEXT DEFAULT NULL, p_balance_offset NUMERIC DEFAULT NULL ) RETURNS dataflow.stacks AS $$ UPDATE dataflow.stacks SET label = COALESCE(p_label, label), fields = COALESCE(p_fields, fields), amount_field = COALESCE(p_amount_field, amount_field), date_field = COALESCE(p_date_field, date_field), balance_offset = COALESCE(p_balance_offset, balance_offset) WHERE name = p_name RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: delete_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT) RETURNS TABLE (name TEXT) AS $$ DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: upsert_stack_source ------------------------------------------------------ CREATE OR REPLACE FUNCTION upsert_stack_source( p_stack_name TEXT, p_source_name TEXT, p_field_map JSONB DEFAULT '{}', p_amount_sign INTEGER DEFAULT 1, p_balance_offset NUMERIC DEFAULT 0, p_amount_field TEXT DEFAULT NULL, p_date_field TEXT DEFAULT NULL ) RETURNS dataflow.stack_sources AS $$ INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq) VALUES ( p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field, (SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name) ) ON CONFLICT (stack_name, source_name) DO UPDATE SET field_map = EXCLUDED.field_map, amount_sign = EXCLUDED.amount_sign, balance_offset = EXCLUDED.balance_offset, amount_field = EXCLUDED.amount_field, date_field = EXCLUDED.date_field RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: remove_stack_source ------------------------------------------------------ CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT) RETURNS TABLE (source_name TEXT) AS $$ DELETE FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name = p_source_name RETURNING source_name; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: calibrate_balance -- Queries dfv.{source} directly using per-source amount/date fields. -- No stack view required. ------------------------------------------------------ CREATE OR REPLACE FUNCTION calibrate_balance( p_stack_name TEXT, p_source_name TEXT, p_as_of_date DATE, p_known_balance NUMERIC ) RETURNS JSON AS $$ DECLARE v_src dataflow.stack_sources%ROWTYPE; v_running NUMERIC; v_sql TEXT; BEGIN SELECT * INTO v_src FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name = p_source_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Source not in stack'); END IF; IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first'); END IF; BEGIN IF p_as_of_date IS NULL THEN v_sql := format( 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I', v_src.amount_field, v_src.amount_sign, p_source_name ); ELSE v_sql := format( 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date', v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date ); END IF; EXECUTE v_sql INTO v_running; EXCEPTION WHEN undefined_table THEN RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first'); END; RETURN json_build_object( 'success', true, 'source', p_source_name, 'as_of_date', p_as_of_date, 'known_balance', p_known_balance, 'computed_sum', v_running, 'suggested_offset', p_known_balance - v_running ); END; $$ LANGUAGE plpgsql STABLE; ------------------------------------------------------ -- Function: generate_stack_view -- Builds a WITH ... UNION ALL view in dfv schema from existing dfv source views. -- Each source CTE applies amount_sign and computes a per-source running balance. -- Outer SELECT adds net_balance across all sources. ------------------------------------------------------ CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT, p_dry_run BOOLEAN DEFAULT false) RETURNS JSON AS $$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_src dataflow.stack_sources%ROWTYPE; v_field JSONB; v_ctes TEXT[] := '{}'; v_cte_names TEXT[] := '{}'; v_select TEXT; v_col TEXT; v_src_field TEXT; v_amt_src TEXT; v_date_src TEXT; v_view TEXT; v_sql TEXT; v_has_bal BOOLEAN; v_canon_cols TEXT; v_src_bal_cols TEXT; v_total_offset NUMERIC := 0; v_cascade_stale TEXT[]; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL; -- Build one CTE per source querying dfv.{source} directly FOR v_src IN SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id LOOP v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name); FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields) LOOP v_col := v_field->>'name'; IF v_has_bal AND v_col = v_stack.amount_field THEN -- Use per-source amount_field with sign applied IF v_src.amount_field IS NULL THEN v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col); ELSE v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col); END IF; ELSIF v_has_bal AND v_col = v_stack.date_field THEN -- Use per-source date_field IF v_src.date_field IS NULL THEN v_select := v_select || format(', NULL::date AS %I', v_col); ELSE v_select := v_select || format(', %I AS %I', v_src.date_field, v_col); END IF; ELSE -- Other canonical fields: use field_map or same name, NULL if column doesn't exist v_src_field := COALESCE(v_src.field_map->>v_col, v_col); IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'dfv' AND table_name = v_src.source_name AND column_name = v_src_field ) THEN v_select := v_select || format(', %I AS %I', v_src_field, v_col); ELSE v_select := v_select || format(', NULL::text AS %I', v_col); END IF; END IF; END LOOP; v_select := v_select || format(' FROM dfv.%I', v_src.source_name); v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select); v_cte_names := v_cte_names || quote_ident(v_src.source_name); -- Accumulate carried-forward source balance column and total offset IF v_has_bal THEN IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF; v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format( 'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I', v_src.source_name, v_stack.amount_field, v_stack.date_field, v_src.balance_offset, v_src.source_name || '_balance' ); v_total_offset := v_total_offset + v_src.balance_offset; END IF; END LOOP; IF array_length(v_ctes, 1) IS NULL THEN RETURN json_build_object('success', false, 'error', 'Stack has no sources'); END IF; v_view := 'dfv.' || quote_ident(p_stack_name); v_canon_cols := ( SELECT string_agg(quote_ident(f->>'name'), ', ') FROM jsonb_array_elements(v_stack.fields) f ); IF v_has_bal THEN v_sql := format( 'CREATE VIEW %s AS ' 'WITH %s, _stacked AS (SELECT * FROM %s) ' 'SELECT _source, _id, %s, ' '%s, ' 'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance ' 'FROM _stacked ORDER BY %I DESC, _id DESC', v_view, array_to_string(v_ctes, ', '), array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), v_canon_cols, v_src_bal_cols, v_stack.amount_field, v_stack.date_field, v_total_offset, v_stack.date_field ); ELSE v_sql := format( 'CREATE VIEW %s AS ' 'WITH %s, _stacked AS (SELECT * FROM %s) ' 'SELECT _source, _id, %s FROM _stacked', v_view, array_to_string(v_ctes, ', '), array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), v_canon_cols ); END IF; IF NOT p_dry_run THEN CREATE SCHEMA IF NOT EXISTS dfv; EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view); EXECUTE v_sql; -- Detect stacks whose views were dropped by CASCADE and mark them stale SELECT array_agg(s.name) INTO v_cascade_stale FROM dataflow.stacks s WHERE s.name != p_stack_name AND s.view_generated_at IS NOT NULL AND NOT EXISTS ( SELECT 1 FROM pg_views v WHERE v.schemaname = 'dfv' AND v.viewname = s.name ); UPDATE dataflow.stacks SET view_generated_at = NULL WHERE name = ANY(v_cascade_stale); END IF; RETURN json_build_object( 'success', true, 'view', v_view, 'sql', v_sql, 'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json) ); END; $$ LANGUAGE plpgsql; ------------------------------------------------------ -- Function: get_stack_balance -- Returns the current running balance (last row of the generated view) ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_stack_balance(p_stack_name TEXT) RETURNS JSON AS $$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_balance NUMERIC; v_view TEXT; v_sql TEXT; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set'); END IF; v_view := 'dfv.' || quote_ident(p_stack_name); BEGIN v_sql := format( 'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1', v_view, v_stack.date_field ); EXECUTE v_sql INTO v_balance; EXCEPTION WHEN undefined_table THEN RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first'); END; RETURN json_build_object('success', true, 'balance', v_balance); END; $$ LANGUAGE plpgsql STABLE; COMMENT ON FUNCTION generate_stack_view(TEXT, BOOLEAN) IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance; p_dry_run=true returns SQL without executing'; COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset'; COMMENT ON FUNCTION get_stack_balance IS 'Return the current running balance (last row) from the generated dfv view'; ------------------------------------------------------ -- Function: reorder_stack_sources ------------------------------------------------------ CREATE OR REPLACE FUNCTION reorder_stack_sources(p_stack_name TEXT, p_source_names TEXT[]) RETURNS VOID AS $$ DECLARE i INTEGER; BEGIN FOR i IN 1..array_length(p_source_names, 1) LOOP UPDATE dataflow.stack_sources SET seq = i WHERE stack_name = p_stack_name AND source_name = p_source_names[i]; END LOOP; END; $$ LANGUAGE plpgsql;