-- -- Stacks queries -- All SQL for api/routes/stacks.js -- SET search_path TO dataflow, public; ------------------------------------------------------ -- Tables ------------------------------------------------------ CREATE TABLE IF NOT EXISTS dataflow.stacks ( name TEXT PRIMARY KEY, label TEXT, -- Ordered canonical field definitions: [{name, label, type}] -- type: 'text' | 'numeric' | 'date' fields JSONB NOT NULL DEFAULT '[]', -- Running balance config amount_field TEXT, -- canonical field to sum for running balance date_field TEXT, -- canonical field to order by balance_offset NUMERIC DEFAULT 0, -- added to running sum (calibration) created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS dataflow.stack_sources ( id SERIAL PRIMARY KEY, stack_name TEXT NOT NULL REFERENCES dataflow.stacks(name) ON DELETE CASCADE, source_name TEXT NOT NULL REFERENCES dataflow.sources(name) ON DELETE CASCADE, -- Maps canonical field name → field name in records.transformed field_map JSONB NOT NULL DEFAULT '{}', -- Multiply amount by this before summing (1 = as-is, -1 = flip sign) amount_sign INTEGER NOT NULL DEFAULT 1, -- Seed balance for this source — added as a constant to the combined running total balance_offset NUMERIC NOT NULL DEFAULT 0, UNIQUE (stack_name, source_name) ); ------------------------------------------------------ -- Function: list_stacks ------------------------------------------------------ CREATE OR REPLACE FUNCTION list_stacks() RETURNS TABLE ( name TEXT, label TEXT, fields JSONB, amount_field TEXT, date_field TEXT, balance_offset NUMERIC, source_count BIGINT, created_at TIMESTAMPTZ ) AS $$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, count(ss.id) AS source_count, s.created_at FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at ORDER BY s.name; $$ LANGUAGE sql STABLE; ------------------------------------------------------ -- Function: get_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_stack(p_name TEXT) RETURNS TABLE ( name TEXT, label TEXT, fields JSONB, amount_field TEXT, date_field TEXT, balance_offset NUMERIC, created_at TIMESTAMPTZ, sources JSONB ) AS $$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at, COALESCE(jsonb_agg( jsonb_build_object( 'id', ss.id, 'source_name', ss.source_name, 'field_map', ss.field_map, 'amount_sign', ss.amount_sign, 'balance_offset', ss.balance_offset ) ORDER BY ss.source_name ) FILTER (WHERE ss.id IS NOT NULL), '[]') FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name WHERE s.name = p_name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at; $$ LANGUAGE sql STABLE; ------------------------------------------------------ -- Function: create_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION create_stack( p_name TEXT, p_label TEXT DEFAULT NULL, p_fields JSONB DEFAULT '[]', p_amount_field TEXT DEFAULT NULL, p_date_field TEXT DEFAULT NULL, p_balance_offset NUMERIC DEFAULT 0 ) RETURNS dataflow.stacks AS $$ INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset) VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset) RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: update_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION update_stack( p_name TEXT, p_label TEXT DEFAULT NULL, p_fields JSONB DEFAULT NULL, p_amount_field TEXT DEFAULT NULL, p_date_field TEXT DEFAULT NULL, p_balance_offset NUMERIC DEFAULT NULL ) RETURNS dataflow.stacks AS $$ UPDATE dataflow.stacks SET label = COALESCE(p_label, label), fields = COALESCE(p_fields, fields), amount_field = COALESCE(p_amount_field, amount_field), date_field = COALESCE(p_date_field, date_field), balance_offset = COALESCE(p_balance_offset, balance_offset) WHERE name = p_name RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: delete_stack ------------------------------------------------------ CREATE OR REPLACE FUNCTION delete_stack(p_name TEXT) RETURNS TABLE (name TEXT) AS $$ DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: upsert_stack_source ------------------------------------------------------ CREATE OR REPLACE FUNCTION upsert_stack_source( p_stack_name TEXT, p_source_name TEXT, p_field_map JSONB DEFAULT '{}', p_amount_sign INTEGER DEFAULT 1, p_balance_offset NUMERIC DEFAULT 0 ) RETURNS dataflow.stack_sources AS $$ INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset) VALUES (p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset) ON CONFLICT (stack_name, source_name) DO UPDATE SET field_map = EXCLUDED.field_map, amount_sign = EXCLUDED.amount_sign, balance_offset = EXCLUDED.balance_offset RETURNING *; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: remove_stack_source ------------------------------------------------------ CREATE OR REPLACE FUNCTION remove_stack_source(p_stack_name TEXT, p_source_name TEXT) RETURNS TABLE (source_name TEXT) AS $$ DELETE FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name = p_source_name RETURNING source_name; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: calibrate_balance -- Given a known good balance at a specific date, compute the offset needed. -- Returns: {computed_at_date, known_balance, suggested_offset} ------------------------------------------------------ CREATE OR REPLACE FUNCTION calibrate_balance( p_stack_name TEXT, p_source_name TEXT, -- specific source to calibrate, or NULL for combined p_as_of_date DATE, p_known_balance NUMERIC ) RETURNS JSON AS $$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_running NUMERIC := 0; v_other_offsets NUMERIC := 0; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN RETURN json_build_object('success', false, 'error', 'Stack must have amount_field and date_field set'); END IF; -- Sum amount * sign for the target source(s) up to as_of_date SELECT COALESCE(SUM( (rec.transformed ->> (ss.field_map ->> v_stack.amount_field))::numeric * ss.amount_sign ), 0) INTO v_running FROM dataflow.stack_sources ss JOIN dataflow.records rec ON rec.source_name = ss.source_name WHERE ss.stack_name = p_stack_name AND (p_source_name IS NULL OR ss.source_name = p_source_name) AND rec.transformed IS NOT NULL AND (rec.transformed ->> (ss.field_map ->> v_stack.date_field))::date <= p_as_of_date; -- For combined calibration, include existing offsets from other sources IF p_source_name IS NOT NULL THEN SELECT COALESCE(SUM(balance_offset), 0) INTO v_other_offsets FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name != p_source_name; END IF; RETURN json_build_object( 'success', true, 'source', p_source_name, 'as_of_date', p_as_of_date, 'known_balance', p_known_balance, 'computed_sum', v_running, 'other_offsets', v_other_offsets, 'suggested_offset', p_known_balance - v_running ); END; $$ LANGUAGE plpgsql STABLE; ------------------------------------------------------ -- Function: generate_stack_view -- Builds a UNION ALL view in dfv schema from all member sources. -- Includes running_balance if amount_field and date_field are set. ------------------------------------------------------ CREATE OR REPLACE FUNCTION generate_stack_view(p_stack_name TEXT) RETURNS JSON AS $$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_src dataflow.stack_sources%ROWTYPE; v_field JSONB; v_parts TEXT[] := '{}'; v_select TEXT; v_col TEXT; v_src_field TEXT; v_view TEXT; v_sql TEXT; v_has_bal BOOLEAN; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL; -- Build one SELECT per source FOR v_src IN SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY source_name LOOP v_select := format('SELECT %L AS _source, rec.id AS _id', v_src.source_name); -- Canonical fields, mapped to source field names FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields) LOOP v_col := v_field->>'name'; v_src_field := COALESCE(v_src.field_map->>v_col, v_col); v_select := v_select || ', ' || CASE v_field->>'type' WHEN 'numeric' THEN format('(rec.transformed->>%L)::numeric AS %I', v_src_field, v_col) WHEN 'date' THEN format('(rec.transformed->>%L)::date AS %I', v_src_field, v_col) ELSE format('rec.transformed->>%L AS %I', v_src_field, v_col) END; END LOOP; -- amount_sign column for running balance v_select := v_select || format(', %s::integer AS _sign, %s::numeric AS _src_offset', v_src.amount_sign, v_src.balance_offset); v_select := v_select || format( ' FROM dataflow.records rec WHERE rec.source_name = %L AND rec.transformed IS NOT NULL', v_src.source_name ); v_parts := v_parts || v_select; END LOOP; IF array_length(v_parts, 1) IS NULL THEN RETURN json_build_object('success', false, 'error', 'Stack has no sources'); END IF; CREATE SCHEMA IF NOT EXISTS dfv; v_view := 'dfv.' || quote_ident(p_stack_name); EXECUTE format('DROP VIEW IF EXISTS %s', v_view); -- Wrap in outer SELECT that adds running_balance if configured IF v_has_bal THEN -- running_balance = cumulative sum of (amount * sign) + per-source seed offsets + stack-level offset -- Each row's _src_offset is that source's balance_offset, summed cumulatively so it's added once -- per source in the window rather than per row. We use a trick: sum(_src_offset / count_of_source_rows) -- is complex, so instead we add SUM(DISTINCT _src_offset per _source) as a constant via subquery. -- Simpler: treat each source offset as a lump added to its first row only via ROW_NUMBER trick. -- Cleanest: add total of all source offsets as a constant (valid when each source is calibrated -- relative to its own transaction history, not to each other). v_sql := format( 'CREATE VIEW %s AS ' 'SELECT _source, _id, %s, ' 'SUM((%I)::numeric * _sign) OVER (ORDER BY %I ASC, _id ASC) ' '+ (SELECT COALESCE(SUM(balance_offset),0) FROM dataflow.stack_sources WHERE stack_name = %L) ' '+ %s AS running_balance ' 'FROM (%s) _stacked', v_view, (SELECT string_agg(quote_ident(f->>'name'), ', ') FROM jsonb_array_elements(v_stack.fields) f), v_stack.amount_field, v_stack.date_field, p_stack_name, v_stack.balance_offset, array_to_string(v_parts, ' UNION ALL ') ); ELSE v_sql := format( 'CREATE VIEW %s AS SELECT _source, _id, %s FROM (%s) _stacked', v_view, (SELECT string_agg(quote_ident(f->>'name'), ', ') FROM jsonb_array_elements(v_stack.fields) f), array_to_string(v_parts, ' UNION ALL ') ); END IF; EXECUTE v_sql; RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION generate_stack_view IS 'Generate a UNION ALL view in dfv schema combining multiple sources with optional running balance'; COMMENT ON FUNCTION calibrate_balance IS 'Given a known good balance at a date, compute the offset to add to balance_offset';