From 89a70bdf7e4ec74a2c3bbfc47e5de58a2bbd4ae7 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Sat, 23 May 2026 11:00:24 -0400 Subject: [PATCH] Split transformed column; add override management; show all override keys in panel - transformed now stores only rule additions (not merged data+overrides) - View dynamically computes data || transformed || overrides at query time - New DB functions: set/clear/bulk_set_record_overrides - Records panel now includes source-wide override keys so party/reason etc. appear even on records that don't have them set yet Co-Authored-By: Claude Sonnet 4.6 --- api/routes/records.js | 24 +- database/functions.sql | 1013 ++++++++++++++++++++++++- database/migrate_overrides_column.sql | 40 + database/schema.sql | 15 +- docs/refactor-transformed-split.md | 62 ++ ui/src/pages/Records.jsx | 62 +- 6 files changed, 1164 insertions(+), 52 deletions(-) create mode 100644 database/migrate_overrides_column.sql create mode 100644 docs/refactor-transformed-split.md diff --git a/api/routes/records.js b/api/routes/records.js index 4241309..3484e0c 100644 --- a/api/routes/records.js +++ b/api/routes/records.js @@ -49,7 +49,7 @@ module.exports = (pool) => { } }); - // Set overrides for all selected records and immediately merge into transformed + // Set overrides for all selected records router.post('/bulk-overrides', async (req, res, next) => { try { const { source_name, record_ids, overrides } = req.body; @@ -57,25 +57,25 @@ module.exports = (pool) => { return res.status(400).json({ error: 'source_name, record_ids array, and overrides object required' }); const idList = record_ids.map(id => parseInt(id)).join(','); const result = await pool.query( - `SELECT bulk_set_record_overrides(${lit(source_name)}, ARRAY[${idList}]::int[], ${lit(overrides)}) as updated` + `SELECT bulk_set_record_overrides(${lit(source_name)}, ARRAY[${idList}]::int[], ${lit(overrides)}) as result` ); - res.json({ updated: Number(result.rows[0].updated) }); + res.json(result.rows[0].result); } catch (err) { next(err); } }); - // Set overrides for a record and immediately merge into transformed + // Set overrides for a record router.put('/:id/overrides', async (req, res, next) => { try { const { overrides } = req.body; if (!overrides || typeof overrides !== 'object') return res.status(400).json({ error: 'overrides object required' }); const result = await pool.query( - `SELECT * FROM set_record_overrides(${lit(parseInt(req.params.id))}, ${lit(overrides)})` + `SELECT set_record_overrides(${lit(parseInt(req.params.id))}, ${lit(overrides)}) as rec` ); - if (result.rows.length === 0) return res.status(404).json({ error: 'Record not found' }); - res.json(result.rows[0]); + if (!result.rows[0].rec) return res.status(404).json({ error: 'Record not found' }); + res.json(result.rows[0].rec); } catch (err) { next(err); } @@ -84,13 +84,13 @@ module.exports = (pool) => { // Clear overrides and reprocess that record to restore computed values router.delete('/:id/overrides', async (req, res, next) => { try { - const rec = await pool.query( - `SELECT * FROM clear_record_overrides(${lit(parseInt(req.params.id))})` + const result = await pool.query( + `SELECT clear_record_overrides(${lit(parseInt(req.params.id))}) as rec` ); - if (rec.rows.length === 0) return res.status(404).json({ error: 'Record not found' }); - // Reprocess this record so transformed reflects rules/mappings without overrides + if (!result.rows[0].rec) return res.status(404).json({ error: 'Record not found' }); + const { source_name } = result.rows[0].rec; await pool.query( - `SELECT apply_transformations(${lit(rec.rows[0].source_name)}, ARRAY[${lit(parseInt(req.params.id))}::int], true)` + `SELECT apply_transformations(${lit(source_name)}, ARRAY[${lit(parseInt(req.params.id))}::int], true)` ); const updated = await pool.query(`SELECT * FROM get_record(${lit(parseInt(req.params.id))})`); res.json(updated.rows[0]); diff --git a/database/functions.sql b/database/functions.sql index 6d01b09..18bedde 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -284,7 +284,7 @@ record_additions AS ( -- Update all qualifying records; records with no rule matches get transformed = data updated AS ( UPDATE dataflow.records rec - SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb) || COALESCE(rec.overrides, '{}'::jsonb), + SET transformed = COALESCE(ra.additions, '{}'::jsonb), transformed_at = CURRENT_TIMESTAMP FROM qualifying q LEFT JOIN record_additions ra ON ra.id = q.id @@ -473,8 +473,6 @@ BEGIN IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; IF v_field->>'expression' IS NOT NULL THEN - -- Computed expression: substitute {fieldname} refs with (transformed->>'fieldname')::type - -- e.g. "{Amount} * {sign}" → "(transformed->>'Amount')::numeric * (transformed->>'sign')::numeric" DECLARE v_expr TEXT := v_field->>'expression'; v_ref TEXT; @@ -483,20 +481,20 @@ BEGIN WHILE v_expr ~ '\{[^}]+\}' LOOP v_ref := substring(v_expr FROM '\{([^}]+)\}'); v_expr := replace(v_expr, '{' || v_ref || '}', - format('(transformed->>%L)::numeric', v_ref)); + format('(r->>%L)::numeric', v_ref)); END LOOP; v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); END; ELSE CASE v_field->>'type' WHEN 'date' THEN - v_cols := v_cols || format('(transformed->>%L)::date AS %I', + v_cols := v_cols || format('(r->>%L)::date AS %I', v_field->>'name', v_field->>'name'); WHEN 'numeric' THEN - v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', + v_cols := v_cols || format('(r->>%L)::numeric AS %I', v_field->>'name', v_field->>'name'); ELSE - v_cols := v_cols || format('transformed->>%L AS %I', + v_cols := v_cols || format('r->>%L AS %I', v_field->>'name', v_field->>'name'); END CASE; END IF; @@ -509,7 +507,7 @@ BEGIN EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view); v_sql := format( - 'CREATE VIEW %s AS SELECT id, %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', + 'CREATE VIEW %s AS SELECT id, %s FROM (SELECT id, data || COALESCE(transformed, ''{}''::jsonb) || COALESCE(overrides, ''{}''::jsonb) AS r FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL) rec', v_view, v_cols, p_source_name ); @@ -521,16 +519,997 @@ $$ LANGUAGE plpgsql; COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields'; +------------------------------------------------------ +-- Function: set_record_overrides +-- Save override values for a single record +------------------------------------------------------ +DROP FUNCTION IF EXISTS set_record_overrides(INTEGER, JSONB); +CREATE OR REPLACE FUNCTION set_record_overrides(p_id INTEGER, p_overrides JSONB) +RETURNS JSON AS $$ + WITH updated AS ( + UPDATE dataflow.records + SET overrides = CASE WHEN p_overrides = '{}'::jsonb THEN NULL ELSE p_overrides END + WHERE id = p_id + RETURNING * + ) + SELECT row_to_json(updated) FROM updated; +$$ LANGUAGE sql; + +------------------------------------------------------ +-- Function: clear_record_overrides +-- Remove all overrides for a single record +------------------------------------------------------ +DROP FUNCTION IF EXISTS clear_record_overrides(INTEGER); +CREATE OR REPLACE FUNCTION clear_record_overrides(p_id INTEGER) +RETURNS JSON AS $$ + WITH updated AS ( + UPDATE dataflow.records + SET overrides = NULL + WHERE id = p_id + RETURNING * + ) + SELECT row_to_json(updated) FROM updated; +$$ LANGUAGE sql; + +------------------------------------------------------ +-- Function: bulk_set_record_overrides +-- Apply override values to multiple records +------------------------------------------------------ +DROP FUNCTION IF EXISTS bulk_set_record_overrides(TEXT, INTEGER[], JSONB); +CREATE OR REPLACE FUNCTION bulk_set_record_overrides(p_source_name TEXT, p_ids INTEGER[], p_overrides JSONB) +RETURNS JSON AS $$ + WITH updated AS ( + UPDATE dataflow.records + SET overrides = COALESCE(overrides, '{}'::jsonb) || p_overrides + WHERE id = ANY(p_ids) + AND source_name = p_source_name + RETURNING id + ) + SELECT json_build_object('updated', count(*)) FROM updated; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION dataflow.calibrate_balance(p_stack_name text, p_source_name text, p_as_of_date date, p_known_balance numeric) + RETURNS json + LANGUAGE plpgsql + STABLE +AS $function$ +DECLARE + v_src dataflow.stack_sources%ROWTYPE; + v_running NUMERIC; + v_sql TEXT; +BEGIN + SELECT * INTO v_src + FROM dataflow.stack_sources + WHERE stack_name = p_stack_name AND source_name = p_source_name; + + IF NOT FOUND THEN + RETURN json_build_object('success', false, 'error', 'Source not in stack'); + END IF; + IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN + RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first'); + END IF; + + BEGIN + IF p_as_of_date IS NULL THEN + v_sql := format( + 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I', + v_src.amount_field, v_src.amount_sign, p_source_name + ); + ELSE + v_sql := format( + 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date', + v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date + ); + END IF; + EXECUTE v_sql INTO v_running; + EXCEPTION WHEN undefined_table THEN + RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first'); + END; + + RETURN json_build_object( + 'success', true, + 'source', p_source_name, + 'as_of_date', p_as_of_date, + 'known_balance', p_known_balance, + 'computed_sum', v_running, + 'suggested_offset', p_known_balance - v_running + ); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.create_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb) + RETURNS dataflow.mappings + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) + VALUES (p_source_name, p_rule_name, p_input_value, p_output) + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.create_rule(p_source_name text, p_name text, p_field text, p_pattern text, p_output_field text, p_function_type text DEFAULT 'extract'::text, p_flags text DEFAULT ''::text, p_replace_value text DEFAULT ''::text, p_enabled boolean DEFAULT true, p_retain boolean DEFAULT false, p_sequence integer DEFAULT 0) + RETURNS dataflow.rules + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.rules + (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence) + VALUES + (p_source_name, p_name, p_field, p_pattern, p_output_field, p_function_type, p_flags, p_replace_value, p_enabled, p_retain, p_sequence) + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb, p_global_picklist boolean DEFAULT true) + RETURNS dataflow.sources + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.sources (name, constraint_fields, config, global_picklist) + VALUES (p_name, p_constraint_fields, p_config, p_global_picklist) + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb) + RETURNS dataflow.sources + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.sources (name, constraint_fields, config) + VALUES (p_name, p_constraint_fields, p_config) + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.create_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT '[]'::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT 0) + RETURNS dataflow.stacks + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset) + VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset) + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_mapping(p_id integer) + RETURNS TABLE(id integer) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_pivot_layout(p_id integer) + RETURNS TABLE(id integer) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.pivot_layouts WHERE id = p_id RETURNING id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_record(p_id bigint) + RETURNS TABLE(id bigint) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.records WHERE id = p_id RETURNING id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_rule(p_id integer) + RETURNS TABLE(id integer, name text) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.rules WHERE id = p_id RETURNING id, name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_source(p_name text) + RETURNS text + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.sources WHERE name = p_name RETURNING name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_source_records(p_source_name text) + RETURNS TABLE(deleted_count bigint) + LANGUAGE sql +AS $function$ + WITH deleted AS ( + DELETE FROM dataflow.records WHERE source_name = p_source_name RETURNING id + ) + SELECT count(*) AS deleted_count FROM deleted; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.delete_stack(p_name text) + RETURNS TABLE(name text) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.generate_stack_view(p_stack_name text, p_dry_run boolean DEFAULT false) + RETURNS json + LANGUAGE plpgsql +AS $function$ +DECLARE + v_stack dataflow.stacks%ROWTYPE; + v_src dataflow.stack_sources%ROWTYPE; + v_field JSONB; + v_ctes TEXT[] := '{}'; + v_cte_names TEXT[] := '{}'; + v_select TEXT; + v_col TEXT; + v_src_field TEXT; + v_amt_src TEXT; + v_date_src TEXT; + v_view TEXT; + v_sql TEXT; + v_has_bal BOOLEAN; + v_canon_cols TEXT; + v_src_bal_cols TEXT; + v_total_offset NUMERIC := 0; + v_cascade_stale TEXT[]; +BEGIN + SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; + IF NOT FOUND THEN + RETURN json_build_object('success', false, 'error', 'Stack not found'); + END IF; + + v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL; + + -- Build one CTE per source querying dfv.{source} directly + FOR v_src IN + SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id + LOOP + v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name); + + FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields) + LOOP + v_col := v_field->>'name'; + + IF v_has_bal AND v_col = v_stack.amount_field THEN + -- Use per-source amount_field with sign applied + IF v_src.amount_field IS NULL THEN + v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col); + ELSE + v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col); + END IF; + ELSIF v_has_bal AND v_col = v_stack.date_field THEN + -- Use per-source date_field + IF v_src.date_field IS NULL THEN + v_select := v_select || format(', NULL::date AS %I', v_col); + ELSE + v_select := v_select || format(', %I AS %I', v_src.date_field, v_col); + END IF; + ELSE + -- Other canonical fields: use field_map or same name, NULL if column doesn't exist + v_src_field := COALESCE(v_src.field_map->>v_col, v_col); + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'dfv' + AND table_name = v_src.source_name + AND column_name = v_src_field + ) THEN + v_select := v_select || format(', %I AS %I', v_src_field, v_col); + ELSE + v_select := v_select || format(', NULL::text AS %I', v_col); + END IF; + END IF; + END LOOP; + + v_select := v_select || format(' FROM dfv.%I', v_src.source_name); + + v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select); + v_cte_names := v_cte_names || quote_ident(v_src.source_name); + + -- Accumulate carried-forward source balance column and total offset + IF v_has_bal THEN + IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF; + v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format( + 'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I', + v_src.source_name, v_stack.amount_field, v_stack.date_field, + v_src.balance_offset, v_src.source_name || '_balance' + ); + v_total_offset := v_total_offset + v_src.balance_offset; + END IF; + END LOOP; + + IF array_length(v_ctes, 1) IS NULL THEN + RETURN json_build_object('success', false, 'error', 'Stack has no sources'); + END IF; + + v_view := 'dfv.' || quote_ident(p_stack_name); + + v_canon_cols := ( + SELECT string_agg(quote_ident(f->>'name'), ', ') + FROM jsonb_array_elements(v_stack.fields) f + ); + + IF v_has_bal THEN + v_sql := format( + 'CREATE VIEW %s AS ' + 'WITH %s, _stacked AS (SELECT * FROM %s) ' + 'SELECT _source, _id, %s, ' + '%s, ' + 'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance ' + 'FROM _stacked ORDER BY %I DESC, _id DESC', + v_view, + array_to_string(v_ctes, ', '), + array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), + v_canon_cols, + v_src_bal_cols, + v_stack.amount_field, + v_stack.date_field, + v_total_offset, + v_stack.date_field + ); + ELSE + v_sql := format( + 'CREATE VIEW %s AS ' + 'WITH %s, _stacked AS (SELECT * FROM %s) ' + 'SELECT _source, _id, %s FROM _stacked', + v_view, + array_to_string(v_ctes, ', '), + array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), + v_canon_cols + ); + END IF; + + IF NOT p_dry_run THEN + CREATE SCHEMA IF NOT EXISTS dfv; + EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view); + EXECUTE v_sql; + + -- Detect stacks whose views were dropped by CASCADE and mark them stale + SELECT array_agg(s.name) INTO v_cascade_stale + FROM dataflow.stacks s + WHERE s.name != p_stack_name + AND s.view_generated_at IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM pg_views v + WHERE v.schemaname = 'dfv' AND v.viewname = s.name + ); + + UPDATE dataflow.stacks SET view_generated_at = NULL + WHERE name = ANY(v_cascade_stale); + END IF; + + RETURN json_build_object( + 'success', true, + 'view', v_view, + 'sql', v_sql, + 'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json) + ); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_global_output_values() + RETURNS TABLE(col text, val text) + LANGUAGE sql + STABLE +AS $function$ + SELECT DISTINCT e.key AS col, e.value AS val + FROM dataflow.mappings m + JOIN dataflow.sources s ON s.name = m.source_name + CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value) + WHERE s.global_picklist = true + AND e.value IS NOT NULL + AND e.value <> '' + ORDER BY e.key, e.value; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_mapping(p_id integer) + RETURNS dataflow.mappings + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.mappings WHERE id = p_id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_mapping_counts(p_source_name text, p_rule_name text DEFAULT NULL::text) + RETURNS TABLE(rule_name text, input_value jsonb, record_count bigint) + LANGUAGE sql + STABLE +AS $function$ + SELECT + m.rule_name, + m.input_value, + COUNT(rec.id) AS record_count + FROM dataflow.mappings m + JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name + LEFT JOIN dataflow.records rec ON + rec.source_name = m.source_name + AND rec.transformed ? r.output_field + AND rec.transformed -> r.output_field = m.input_value + WHERE m.source_name = p_source_name + AND (p_rule_name IS NULL OR m.rule_name = p_rule_name) + GROUP BY m.rule_name, m.input_value; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_mappings_by_output_field(p_col text, p_val text) + RETURNS TABLE(id integer, source_name text, rule_name text, input_value jsonb, output jsonb) + LANGUAGE sql + STABLE +AS $function$ + SELECT m.id, m.source_name, m.rule_name, m.input_value, m.output + FROM dataflow.mappings m + WHERE m.output->>(p_col) = p_val + ORDER BY m.source_name, m.rule_name, m.input_value::text; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_record(p_id bigint) + RETURNS dataflow.records + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.records WHERE id = p_id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_rule(p_id integer) + RETURNS dataflow.rules + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.rules WHERE id = p_id; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_source(p_name text) + RETURNS dataflow.sources + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.sources WHERE name = p_name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_source_fields(p_source_name text) + RETURNS TABLE(key text, origins text[]) + LANGUAGE sql + STABLE +AS $function$ + SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins + FROM ( + SELECT f->>'name' AS key, 'schema' AS origin + FROM dataflow.sources, jsonb_array_elements(config->'fields') f + WHERE name = p_source_name AND config ? 'fields' + UNION ALL + SELECT jsonb_object_keys(data) AS key, 'raw' AS origin + FROM dataflow.records WHERE source_name = p_source_name + UNION ALL + SELECT output_field AS key, 'rule: ' || name AS origin + FROM dataflow.rules WHERE source_name = p_source_name + UNION ALL + SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin + FROM dataflow.mappings WHERE source_name = p_source_name + ) keys + GROUP BY key + ORDER BY key; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_source_stats(p_source_name text) + RETURNS TABLE(total_records bigint, transformed_records bigint, pending_records bigint) + LANGUAGE sql + STABLE +AS $function$ + SELECT + COUNT(*) AS total_records, + COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records, + COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records + FROM dataflow.records + WHERE source_name = p_source_name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_stack(p_name text) + RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, created_at timestamp with time zone, sources jsonb) + LANGUAGE sql + STABLE +AS $function$ + SELECT + s.name, s.label, s.fields, + s.amount_field, s.date_field, s.balance_offset, + s.created_at, + COALESCE(jsonb_agg( + jsonb_build_object( + 'id', ss.id, + 'source_name', ss.source_name, + 'field_map', ss.field_map, + 'amount_field', ss.amount_field, + 'amount_sign', ss.amount_sign, + 'date_field', ss.date_field, + 'balance_offset', ss.balance_offset, + 'seq', ss.seq + ) ORDER BY ss.seq, ss.id + ) FILTER (WHERE ss.id IS NOT NULL), '[]') + FROM dataflow.stacks s + LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name + WHERE s.name = p_name + GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_stack_balance(p_stack_name text) + RETURNS json + LANGUAGE plpgsql + STABLE +AS $function$ +DECLARE + v_stack dataflow.stacks%ROWTYPE; + v_balance NUMERIC; + v_view TEXT; + v_sql TEXT; +BEGIN + SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; + IF NOT FOUND THEN + RETURN json_build_object('success', false, 'error', 'Stack not found'); + END IF; + IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN + RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set'); + END IF; + + v_view := 'dfv.' || quote_ident(p_stack_name); + + BEGIN + v_sql := format( + 'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1', + v_view, v_stack.date_field + ); + EXECUTE v_sql INTO v_balance; + EXCEPTION WHEN undefined_table THEN + RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first'); + END; + + RETURN json_build_object('success', true, 'balance', v_balance); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_status() + RETURNS json + LANGUAGE plpgsql + STABLE +AS $function$ +DECLARE + v_sources JSON; + v_stacks JSON; +BEGIN + SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json) + INTO v_sources + FROM dataflow.sources + WHERE view_generated_at IS NULL; + + SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json) + INTO v_stacks + FROM dataflow.stacks + WHERE view_generated_at IS NULL; + + RETURN json_build_object('stale_sources', v_sources, 'stale_stacks', v_stacks); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.get_view_data(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_sort_col text DEFAULT NULL::text, p_sort_dir text DEFAULT 'asc'::text, p_filters jsonb DEFAULT NULL::jsonb) + RETURNS json + LANGUAGE plpgsql + STABLE +AS $function$ +DECLARE + v_exists BOOLEAN; + v_where TEXT := ''; + v_order TEXT := ''; + v_rows JSON; + v_filter JSONB; + v_col TEXT; + v_pattern TEXT; +BEGIN + SELECT EXISTS ( + SELECT 1 FROM information_schema.views + WHERE table_schema = 'dfv' AND table_name = p_source_name + ) INTO v_exists; + + IF NOT v_exists THEN + RETURN json_build_object('exists', FALSE, 'rows', '[]'::json); + END IF; + + -- Build WHERE from filters (validate each column exists in the view) + IF p_filters IS NOT NULL THEN + FOR v_filter IN SELECT value FROM jsonb_array_elements(p_filters) LOOP + v_col := v_filter->>'col'; + v_pattern := v_filter->>'pattern'; + IF v_pattern IS NOT NULL AND v_pattern <> '' AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'dfv' + AND table_name = p_source_name + AND column_name = v_col + ) THEN + v_where := v_where || + CASE WHEN v_where = '' THEN ' WHERE ' ELSE ' AND ' END || + quote_ident(v_col) || '::text ~* ' || quote_literal(v_pattern); + END IF; + END LOOP; + END IF; + + IF p_sort_col IS NOT NULL AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'dfv' + AND table_name = p_source_name + AND column_name = p_sort_col + ) THEN + v_order := ' ORDER BY ' || quote_ident(p_sort_col) + || CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END + || ' NULLS LAST'; + END IF; + + EXECUTE format( + 'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s%s LIMIT %s OFFSET %s) t', + p_source_name, v_where, v_order, p_limit, p_offset + ) INTO v_rows; + + RETURN json_build_object('exists', TRUE, 'rows', v_rows); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_mappings(p_source_name text, p_rule_name text DEFAULT NULL::text) + RETURNS SETOF dataflow.mappings + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.mappings + WHERE source_name = p_source_name + AND (p_rule_name IS NULL OR rule_name = p_rule_name) + ORDER BY rule_name, input_value::text; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_pivot_layouts(p_source_name text) + RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone) + LANGUAGE sql +AS $function$ + SELECT id, source_name, layout_name, config, created_at + FROM dataflow.pivot_layouts + WHERE source_name = p_source_name + ORDER BY layout_name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_records(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_transformed_only boolean DEFAULT false) + RETURNS SETOF dataflow.records + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.records + WHERE source_name = p_source_name + AND (NOT p_transformed_only OR transformed IS NOT NULL) + ORDER BY id DESC + LIMIT p_limit OFFSET p_offset; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_rules(p_source_name text) + RETURNS SETOF dataflow.rules + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.rules + WHERE source_name = p_source_name + ORDER BY sequence, name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_sources() + RETURNS SETOF dataflow.sources + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.sources ORDER BY name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.list_stacks() + RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, source_count bigint, created_at timestamp with time zone) + LANGUAGE sql + STABLE +AS $function$ + SELECT + s.name, s.label, s.fields, + s.amount_field, s.date_field, s.balance_offset, + count(ss.id) AS source_count, + s.created_at + FROM dataflow.stacks s + LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name + GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at + ORDER BY s.name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.preview_rule(p_source text, p_field text, p_pattern text, p_flags text DEFAULT ''::text, p_function_type text DEFAULT 'extract'::text, p_replace_value text DEFAULT ''::text, p_limit integer DEFAULT 20) + RETURNS TABLE(id integer, raw_value text, extracted_value jsonb) + LANGUAGE plpgsql + STABLE +AS $function$ +-- Field is resolved from data first, then transformed (supports chained rules whose +-- input field was produced by an earlier-sequence rule rather than the raw import). +BEGIN + IF p_function_type = 'replace' THEN + RETURN QUERY + SELECT + r.id, + COALESCE(r.data ->> p_field, r.transformed ->> p_field), + to_jsonb(regexp_replace( + COALESCE(r.data ->> p_field, r.transformed ->> p_field), + p_pattern, p_replace_value, p_flags + )) + FROM dataflow.records r + WHERE source_name = p_source + AND (data ? p_field OR transformed ? p_field) + ORDER BY r.id DESC LIMIT p_limit; + ELSE + RETURN QUERY + SELECT + r.id, + COALESCE(r.data ->> p_field, r.transformed ->> p_field), + CASE + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 THEN agg.matches -> 0 + ELSE agg.matches + END + FROM dataflow.records r + CROSS JOIN LATERAL ( + SELECT + jsonb_agg( + CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END + ORDER BY rn + ) AS matches, + count(*)::int AS match_count + FROM regexp_matches( + COALESCE(r.data ->> p_field, r.transformed ->> p_field), + p_pattern, p_flags + ) + WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = p_source + AND (r.data ? p_field OR r.transformed ? p_field) + ORDER BY r.id DESC LIMIT p_limit; + END IF; +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.remap_output_field(p_col text, p_from_val text, p_to_val text) + RETURNS integer + LANGUAGE plpgsql +AS $function$ +DECLARE + updated_count INTEGER; +BEGIN + UPDATE dataflow.mappings + SET output = jsonb_set(output, ARRAY[p_col], to_jsonb(p_to_val)) + WHERE output->>(p_col) = p_from_val; + GET DIAGNOSTICS updated_count = ROW_COUNT; + RETURN updated_count; +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.remove_stack_source(p_stack_name text, p_source_name text) + RETURNS TABLE(source_name text) + LANGUAGE sql +AS $function$ + DELETE FROM dataflow.stack_sources + WHERE stack_name = p_stack_name AND source_name = p_source_name + RETURNING source_name; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.reorder_stack_sources(p_stack_name text, p_source_names text[]) + RETURNS void + LANGUAGE plpgsql +AS $function$ +DECLARE + i INTEGER; +BEGIN + FOR i IN 1..array_length(p_source_names, 1) LOOP + UPDATE dataflow.stack_sources + SET seq = i + WHERE stack_name = p_stack_name AND source_name = p_source_names[i]; + END LOOP; +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.save_pivot_layout(p_source_name text, p_layout_name text, p_config jsonb) + RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone) + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.pivot_layouts (source_name, layout_name, config) + VALUES (p_source_name, p_layout_name, p_config) + ON CONFLICT (source_name, layout_name) DO UPDATE + SET config = EXCLUDED.config + RETURNING id, source_name, layout_name, config, created_at; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.search_mapping_outputs(p_search text) + RETURNS TABLE(col text, val text, mapping_count bigint) + LANGUAGE sql + STABLE +AS $function$ + SELECT e.key AS col, e.value AS val, COUNT(*) AS mapping_count + FROM dataflow.mappings m + CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value) + WHERE e.value ILIKE '%' || p_search || '%' + AND e.value IS NOT NULL + AND e.value <> '' + GROUP BY e.key, e.value + ORDER BY e.key, e.value; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.search_records(p_source_name text, p_query jsonb, p_limit integer DEFAULT 100) + RETURNS SETOF dataflow.records + LANGUAGE sql + STABLE +AS $function$ + SELECT * FROM dataflow.records + WHERE source_name = p_source_name + AND (data @> p_query OR transformed @> p_query) + ORDER BY id DESC + LIMIT p_limit; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.source_config_changed() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + IF NEW.config IS DISTINCT FROM OLD.config THEN + NEW.view_generated_at := NULL; + END IF; + RETURN NEW; +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.stack_sources_changed() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + IF TG_OP = 'UPDATE' THEN + IF NEW.field_map IS NOT DISTINCT FROM OLD.field_map AND + NEW.amount_sign IS NOT DISTINCT FROM OLD.amount_sign AND + NEW.balance_offset IS NOT DISTINCT FROM OLD.balance_offset AND + NEW.amount_field IS NOT DISTINCT FROM OLD.amount_field AND + NEW.date_field IS NOT DISTINCT FROM OLD.date_field AND + NEW.seq IS NOT DISTINCT FROM OLD.seq THEN + RETURN NULL; + END IF; + END IF; + UPDATE dataflow.stacks SET view_generated_at = NULL + WHERE name = COALESCE(NEW.stack_name, OLD.stack_name); + RETURN NULL; +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.test_rule(p_rule_id integer, p_limit integer DEFAULT 20) + RETURNS TABLE(rule jsonb, results jsonb) + LANGUAGE plpgsql + STABLE +AS $function$ +DECLARE + v_rule dataflow.rules%ROWTYPE; + v_results JSONB; +BEGIN + SELECT * INTO v_rule FROM dataflow.rules WHERE id = p_rule_id; + IF NOT FOUND THEN RETURN; END IF; + + SELECT jsonb_agg(row_to_json(t)) INTO v_results FROM ( + SELECT + r.id, + r.data ->> v_rule.field AS raw_value, + CASE + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1 + THEN to_jsonb(agg.matches[1][1]) + WHEN agg.match_count = 1 + THEN to_jsonb(agg.matches[1]) + WHEN array_length(agg.matches[1], 1) = 1 + THEN (SELECT jsonb_agg(m[1] ORDER BY idx) FROM unnest(agg.matches) WITH ORDINALITY u(m, idx)) + ELSE to_jsonb(agg.matches) + END AS extracted_value + FROM dataflow.records r + CROSS JOIN LATERAL ( + SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count + FROM regexp_matches(r.data ->> v_rule.field, v_rule.pattern, COALESCE(v_rule.flags, '')) + WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = v_rule.source_name AND r.data ? v_rule.field + ORDER BY r.id DESC LIMIT p_limit + ) t; + + RETURN QUERY SELECT + jsonb_build_object( + 'id', v_rule.id, 'name', v_rule.name, + 'field', v_rule.field, 'pattern', v_rule.pattern, + 'output_field', v_rule.output_field + ), + COALESCE(v_results, '[]'::jsonb); +END; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.update_mapping(p_id integer, p_input_value jsonb DEFAULT NULL::jsonb, p_output jsonb DEFAULT NULL::jsonb) + RETURNS dataflow.mappings + LANGUAGE sql +AS $function$ + UPDATE dataflow.mappings SET + input_value = COALESCE(p_input_value, input_value), + output = COALESCE(p_output, output) + WHERE id = p_id + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.update_rule(p_id integer, p_name text DEFAULT NULL::text, p_field text DEFAULT NULL::text, p_pattern text DEFAULT NULL::text, p_output_field text DEFAULT NULL::text, p_function_type text DEFAULT NULL::text, p_flags text DEFAULT NULL::text, p_replace_value text DEFAULT NULL::text, p_enabled boolean DEFAULT NULL::boolean, p_retain boolean DEFAULT NULL::boolean, p_sequence integer DEFAULT NULL::integer) + RETURNS dataflow.rules + LANGUAGE sql +AS $function$ + UPDATE dataflow.rules SET + name = COALESCE(p_name, name), + field = COALESCE(p_field, field), + pattern = COALESCE(p_pattern, pattern), + output_field = COALESCE(p_output_field, output_field), + function_type = COALESCE(p_function_type, function_type), + flags = COALESCE(p_flags, flags), + replace_value = COALESCE(p_replace_value, replace_value), + enabled = COALESCE(p_enabled, enabled), + retain = COALESCE(p_retain, retain), + sequence = COALESCE(p_sequence, sequence) + WHERE id = p_id + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb, p_global_picklist boolean DEFAULT NULL::boolean) + RETURNS dataflow.sources + LANGUAGE sql +AS $function$ + UPDATE dataflow.sources + SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields), + config = COALESCE(p_config, config), + global_picklist = COALESCE(p_global_picklist, global_picklist), + updated_at = CURRENT_TIMESTAMP + WHERE name = p_name + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb) + RETURNS dataflow.sources + LANGUAGE sql +AS $function$ + UPDATE dataflow.sources + SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields), + config = COALESCE(p_config, config), + updated_at = CURRENT_TIMESTAMP + WHERE name = p_name + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.update_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT NULL::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT NULL::numeric) + RETURNS dataflow.stacks + LANGUAGE sql +AS $function$ + UPDATE dataflow.stacks SET + label = COALESCE(p_label, label), + fields = COALESCE(p_fields, fields), + amount_field = COALESCE(p_amount_field, amount_field), + date_field = COALESCE(p_date_field, date_field), + balance_offset = COALESCE(p_balance_offset, balance_offset) + WHERE name = p_name + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.upsert_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb) + RETURNS dataflow.mappings + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) + VALUES (p_source_name, p_rule_name, p_input_value, p_output) + ON CONFLICT (source_name, rule_name, input_value) + DO UPDATE SET output = EXCLUDED.output + RETURNING *; +$function$ +; +CREATE OR REPLACE FUNCTION dataflow.upsert_stack_source(p_stack_name text, p_source_name text, p_field_map jsonb DEFAULT '{}'::jsonb, p_amount_sign integer DEFAULT 1, p_balance_offset numeric DEFAULT 0, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text) + RETURNS dataflow.stack_sources + LANGUAGE sql +AS $function$ + INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq) + VALUES ( + p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field, + (SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name) + ) + ON CONFLICT (stack_name, source_name) DO UPDATE SET + field_map = EXCLUDED.field_map, + amount_sign = EXCLUDED.amount_sign, + balance_offset = EXCLUDED.balance_offset, + amount_field = EXCLUDED.amount_field, + date_field = EXCLUDED.date_field + RETURNING *; +$function$ +; + ------------------------------------------------------ -- Summary ------------------------------------------------------ --- Functions: 4 simple, focused functions --- 1. import_records - Import with deduplication --- 2. apply_transformations - Apply rules and mappings --- 3. get_unmapped_values - Find values needing mappings --- 4. reprocess_records - Re-transform all records --- --- Each function does ONE thing clearly --- No complex nested CTEs --- Easy to understand and debug +-- All dataflow functions are defined above. +-- Deploy with: psql -d dataflow -f database/functions.sql ------------------------------------------------------ diff --git a/database/migrate_overrides_column.sql b/database/migrate_overrides_column.sql new file mode 100644 index 0000000..bd0c8f7 --- /dev/null +++ b/database/migrate_overrides_column.sql @@ -0,0 +1,40 @@ +-- +-- Migration: add overrides column to records +-- +-- Separates the three data layers: +-- data — original import values, never mutated +-- transformed — rule/mapping output fields only (delta) +-- overrides — manual user overrides (highest precedence) +-- +-- Consumers merge as: data || COALESCE(transformed,'{}') || COALESCE(overrides,'{}') +-- +-- Safe to run multiple times (IF NOT EXISTS guards). +-- + +SET search_path TO dataflow, public; + +-- 1. Add overrides column +ALTER TABLE dataflow.records + ADD COLUMN IF NOT EXISTS overrides JSONB; + +-- 2. Add partial GIN index (only indexes rows that have overrides) +CREATE INDEX IF NOT EXISTS idx_records_overrides + ON dataflow.records USING gin(overrides) + WHERE overrides IS NOT NULL; + +-- 3. Redeploy functions (CREATE OR REPLACE — non-destructive) +\i functions.sql + +-- 4. Reprocess all sources to strip stale data keys from transformed +-- (apply_transformations now writes only rule additions, not data || additions) +DO $$ +DECLARE + src TEXT; + result JSON; +BEGIN + FOR src IN SELECT name FROM dataflow.sources ORDER BY name LOOP + SELECT dataflow.reprocess_records(src) INTO result; + RAISE NOTICE 'Reprocessed %: %', src, result; + END LOOP; +END; +$$; diff --git a/database/schema.sql b/database/schema.sql index 43ac633..196eb8f 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -37,26 +37,27 @@ CREATE TABLE records ( -- Data data JSONB NOT NULL, -- Original imported data constraint_key JSONB, -- Fields that uniquely identify this record (set on import) - transformed JSONB, -- Data after transformations applied + transformed JSONB, -- Rule/mapping output fields only (delta, not raw data) + overrides JSONB, -- Manual user overrides (highest precedence) -- Metadata - import_id INTEGER REFERENCES import_log(id) ON DELETE CASCADE, -- Which import batch this came from + import_id INTEGER REFERENCES import_log(id) ON DELETE CASCADE, imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - transformed_at TIMESTAMPTZ, - - + transformed_at TIMESTAMPTZ ); COMMENT ON TABLE records IS 'Imported records with raw and transformed data'; -COMMENT ON COLUMN records.data IS 'Original data as imported'; +COMMENT ON COLUMN records.data IS 'Original data as imported — never mutated after import'; COMMENT ON COLUMN records.constraint_key IS 'JSONB object of constraint field values — uniquely identifies this record within its source'; -COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules'; +COMMENT ON COLUMN records.transformed IS 'Rule/mapping output fields only (delta); merge as data || transformed || overrides for final values'; +COMMENT ON COLUMN records.overrides IS 'Manual user overrides; highest precedence in data || transformed || overrides merge'; -- Indexes CREATE INDEX idx_records_source ON records(source_name); CREATE INDEX idx_records_constraint ON records USING gin(constraint_key); CREATE INDEX idx_records_data ON records USING gin(data); CREATE INDEX idx_records_transformed ON records USING gin(transformed); +CREATE INDEX idx_records_overrides ON records USING gin(overrides) WHERE overrides IS NOT NULL; ------------------------------------------------------ -- Table: rules diff --git a/docs/refactor-transformed-split.md b/docs/refactor-transformed-split.md new file mode 100644 index 0000000..561035e --- /dev/null +++ b/docs/refactor-transformed-split.md @@ -0,0 +1,62 @@ +# Refactor: Split `transformed` into three columns + +## Goal + +Separate `records` into three clean JSONB layers with clear semantics: + +| Column | Meaning | Wins over | +|---|---|---| +| `data` | Raw import values, never mutated | — | +| `transformed` | Rule/mapping-derived fields only | `data` | +| `overrides` | Manual user overrides | `data`, `transformed` | + +Consumers merge them at read time: + +```sql +data || COALESCE(transformed, '{}'::jsonb) || COALESCE(overrides, '{}'::jsonb) +``` + +## Why + +Currently `transformed` duplicates `data` keys because `apply_transformations` was originally +written as `data || rule_additions`. This makes it impossible to tell what the rules actually +changed vs. what was carried from the original import. + +## Current State (branch: `transformed-refactor`) + +### Already done in functions.sql + +- `apply_transformations` — already stores only rule additions (`COALESCE(ra.additions, '{}')`) +- `generate_source_view` — already uses the 3-way coalesce for `dfv.*` views +- `set_record_overrides`, `clear_record_overrides`, `bulk_set_record_overrides` — exist +- API routes — `PUT /api/records/:id/overrides`, `DELETE /:id/overrides`, `POST /bulk-overrides` exist + +### Still needed + +1. **`database/schema.sql`** — add `overrides JSONB` column to `records` table and a GIN index. + Also fix the syntax error: trailing comma before `)` on line 48. + +2. **`ui/src/pages/Records.jsx`** — right panel currently iterates `selectedRecord.transformed` + for all fields. Split into three sections: + - **Original** (`data`) — read-only, muted style + - **Transformed** (`transformed`) — rule-derived delta only, highlighted + - **Overrides** (`overrides`) — editable, amber style (existing draft UI already works here) + +3. **Deploy + reprocess** (user-triggered, not automated): + - `psql -d dataflow -f database/schema.sql` (drop/recreate schema) + - `psql -d dataflow -f database/functions.sql` (redeploy functions) + - Regenerate all `dfv.*` views via the API for each source + - Run `reprocess_records` on every source to strip stale `data` keys from existing `transformed` rows + +## Rollback + +Branch `stacks` is the stable point. A pg_dump taken before deployment is the DB rollback. + +## File Checklist + +- [ ] `database/schema.sql` — add `overrides` column + index, fix syntax error +- [ ] `database/functions.sql` — no changes needed (already correct) +- [ ] `ui/src/pages/Records.jsx` — split inspector panel into 3 sections +- [ ] Build UI: `cd ui && npm run build` +- [ ] Deploy DB (user-triggered) +- [ ] Reprocess all sources (user-triggered) diff --git a/ui/src/pages/Records.jsx b/ui/src/pages/Records.jsx index 8b04f6d..a5eb472 100644 --- a/ui/src/pages/Records.jsx +++ b/ui/src/pages/Records.jsx @@ -271,11 +271,8 @@ export default function Records({ source }) { const displayCols = (rows.length > 0 ? Object.keys(rows[0]) : cols).filter(c => !HIDDEN_COLS.has(c)) const visCols = cols.filter(c => !HIDDEN_COLS.has(c)) - // For bulk bar: only established override keys (not all transformed cols) + // For bulk bar: only established override keys const allOverrideCols = [...new Set([...overrideCols, ...extraCols])] - // For the single-record panel: all transformed fields + any override keys + draft keys - const recordTransformedCols = Object.keys(selectedRecord?.transformed || {}).filter(c => !HIDDEN_COLS.has(c)) - const knownCols = [...new Set([...overrideCols, ...recordTransformedCols, ...Object.keys(overrideDraft)])] const savedOverrides = selectedRecord?.overrides || {} const isDirty = Object.values(overrideDraft).some(v => String(v).trim()) @@ -496,19 +493,52 @@ export default function Records({ source }) { )} -
- Fields - + {/* Raw fields — read only */} +
+
+ Raw +
+ {Object.entries(selectedRecord.data || {}).map(([field, val]) => ( +
+ {field} + {formatVal(val) ?? } +
+ ))}
-
+ {/* Transformed fields — read only delta */} +
+
+ Transformed +
+ {Object.entries(selectedRecord.transformed || {}).filter(([k]) => !HIDDEN_COLS.has(k)).length === 0 + ?
No rule output yet.
+ : Object.entries(selectedRecord.transformed || {}).filter(([k]) => !HIDDEN_COLS.has(k)).map(([field, val]) => ( +
+ {field} + {formatVal(val) ?? } +
+ )) + } +
+ + {/* Overrides — editable */} +
+
+ Overrides + +
- {knownCols.map(col => { - const val = overrideDraft[col] ?? '' + {[...new Set([ + ...Object.keys(selectedRecord.transformed || {}), + ...Object.keys(selectedRecord.overrides || {}), + ...overrideCols + ])].filter(k => !HIDDEN_COLS.has(k)).map(col => { + const override = overrideDraft[col] ?? '' const placeholder = formatVal(selectedRecord.transformed?.[col]) ?? '' const suggestions = [...(globalValues[col] || [])].sort() return ( @@ -519,9 +549,9 @@ export default function Records({ source }) {
setOverrideDraft(d => ({ ...d, [col]: v }))} onEnter={handleSaveOverrides} @@ -529,7 +559,7 @@ export default function Records({ source }) { /> - {val && ( + {override && (