dataflow/database/functions.sql
Paul Trowbridge eb50704ca0 Add React UI and backend enhancements for dataflow
- Add full React + Vite UI (src/pages: Sources, Rules, Mappings, Records, Import)
- Sidebar layout with source selector persisted to localStorage
- Sources: unified field table with Dedup/In-view checkboxes, CSV suggest, generate dfv view
- Rules: extract/replace function types, regex flags, input field picklist, test results
- Mappings: unmapped values with sample records, inline key/value editor, edit existing mappings
- Records: expanded row shows per-rule extraction and mapping output breakdown
- Import: drag-drop CSV, transform/reprocess buttons, import history
- Backend: add flags/function_type to rules, get_unmapped_values with samples, generate_source_view, fields endpoint, reprocess endpoint
- database/functions.sql: apply_transformations supports replace mode and flags; generate_source_view builds typed dfv views
- Server bound to 0.0.0.0, SPA fallback for client-side routing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 00:35:33 -04:00

310 lines
10 KiB
PL/PgSQL

--
-- Dataflow Functions
-- Simple, clear functions for import and transformation
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Function: import_records
-- Import data with automatic deduplication
------------------------------------------------------
CREATE OR REPLACE FUNCTION import_records(
p_source_name TEXT,
p_data JSONB -- Array of records
) RETURNS JSON AS $$
DECLARE
v_dedup_fields TEXT[];
v_record JSONB;
v_dedup_key TEXT;
v_inserted INTEGER := 0;
v_duplicates INTEGER := 0;
v_log_id INTEGER;
BEGIN
-- Get dedup fields for this source
SELECT dedup_fields INTO v_dedup_fields
FROM dataflow.sources
WHERE name = p_source_name;
IF v_dedup_fields IS NULL THEN
RETURN json_build_object(
'success', false,
'error', 'Source not found: ' || p_source_name
);
END IF;
-- Process each record
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data)
LOOP
-- Generate dedup key
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields);
-- Try to insert (will fail silently if duplicate)
BEGIN
INSERT INTO dataflow.records (source_name, data, dedup_key)
VALUES (p_source_name, v_record, v_dedup_key);
v_inserted := v_inserted + 1;
EXCEPTION WHEN unique_violation THEN
v_duplicates := v_duplicates + 1;
END;
END LOOP;
-- Log the import
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate)
VALUES (p_source_name, v_inserted, v_duplicates)
RETURNING id INTO v_log_id;
RETURN json_build_object(
'success', true,
'imported', v_inserted,
'duplicates', v_duplicates,
'log_id', v_log_id
);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
------------------------------------------------------
-- Function: apply_transformations
-- Apply all transformation rules to records
------------------------------------------------------
CREATE OR REPLACE FUNCTION apply_transformations(
p_source_name TEXT,
p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed
) RETURNS JSON AS $$
DECLARE
v_record RECORD;
v_rule RECORD;
v_transformed JSONB;
v_extracted TEXT;
v_mapping JSONB;
v_count INTEGER := 0;
BEGIN
-- Loop through records to transform
FOR v_record IN
SELECT id, data
FROM dataflow.records
WHERE source_name = p_source_name
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
AND transformed IS NULL
LOOP
-- Start with original data
v_transformed := v_record.data;
-- Apply each rule in sequence
FOR v_rule IN
SELECT * FROM dataflow.rules
WHERE source_name = p_source_name
AND enabled = true
ORDER BY sequence
LOOP
-- Apply rule based on function type
IF v_rule.function_type = 'replace' THEN
v_extracted := regexp_replace(
v_record.data->>v_rule.field,
CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern,
v_rule.output_field
);
v_transformed := jsonb_set(
v_transformed,
ARRAY[v_rule.field],
to_jsonb(v_extracted)
);
ELSE
-- extract (default)
v_extracted := substring(
v_record.data->>v_rule.field
FROM CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern
);
IF v_extracted IS NOT NULL THEN
-- Check if there's a mapping for this value
SELECT output INTO v_mapping
FROM dataflow.mappings
WHERE source_name = p_source_name
AND rule_name = v_rule.name
AND input_value = v_extracted;
IF v_mapping IS NOT NULL THEN
-- Apply mapping (merge mapped fields into result)
v_transformed := v_transformed || v_mapping;
ELSE
-- No mapping, just add extracted value
v_transformed := jsonb_set(
v_transformed,
ARRAY[v_rule.output_field],
to_jsonb(v_extracted)
);
END IF;
END IF;
END IF;
END LOOP;
-- Update record with transformed data
UPDATE dataflow.records
SET transformed = v_transformed,
transformed_at = CURRENT_TIMESTAMP
WHERE id = v_record.id;
v_count := v_count + 1;
END LOOP;
RETURN json_build_object(
'success', true,
'transformed', v_count
);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records';
------------------------------------------------------
-- Function: get_unmapped_values
-- Find extracted values that need mappings
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_unmapped_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
extracted_value TEXT,
record_count BIGINT,
sample_records JSONB
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
rec.transformed->>r.output_field AS extracted_value,
rec.data AS raw_record
FROM
dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE
rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
)
SELECT
e.rule_name,
e.output_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records
FROM (
SELECT e2.rule_name, e2.output_field, e2.extracted_value, e2.raw_record,
row_number() OVER (PARTITION BY e2.rule_name, e2.extracted_value ORDER BY (SELECT NULL)) AS rn
FROM extracted e2
) e
WHERE NOT EXISTS (
SELECT 1 FROM dataflow.mappings m
WHERE m.source_name = p_source_name
AND m.rule_name = e.rule_name
AND m.input_value = e.extracted_value
)
AND e.rn <= 3
GROUP BY e.rule_name, e.output_field, e.extracted_value
ORDER BY count(*) DESC;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
------------------------------------------------------
-- Function: reprocess_records
-- Clear and reapply transformations
------------------------------------------------------
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
RETURNS JSON AS $$
BEGIN
-- Clear existing transformations
UPDATE dataflow.records
SET transformed = NULL,
transformed_at = NULL
WHERE source_name = p_source_name;
-- Reapply transformations
RETURN dataflow.apply_transformations(p_source_name);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
------------------------------------------------------
-- Function: generate_source_view
-- Build a typed flat view in dfv schema
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
RETURNS JSON AS $$
DECLARE
v_config JSONB;
v_fields JSONB;
v_field JSONB;
v_cols TEXT := '';
v_sql TEXT;
v_view TEXT;
BEGIN
SELECT config INTO v_config
FROM dataflow.sources
WHERE name = p_source_name;
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
END IF;
v_fields := v_config->'fields';
FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields)
LOOP
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
CASE v_field->>'type'
WHEN 'date' THEN
v_cols := v_cols || format('(transformed->>%L)::date AS %I',
v_field->>'name', v_field->>'name');
WHEN 'numeric' THEN
v_cols := v_cols || format('(transformed->>%L)::numeric AS %I',
v_field->>'name', v_field->>'name');
ELSE
v_cols := v_cols || format('transformed->>%L AS %I',
v_field->>'name', v_field->>'name');
END CASE;
END LOOP;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_source_name);
v_sql := format(
'CREATE OR REPLACE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL',
v_view, v_cols, p_source_name
);
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields';
------------------------------------------------------
-- Summary
------------------------------------------------------
-- Functions: 4 simple, focused functions
-- 1. import_records - Import with deduplication
-- 2. apply_transformations - Apply rules and mappings
-- 3. get_unmapped_values - Find values needing mappings
-- 4. reprocess_records - Re-transform all records
--
-- Each function does ONE thing clearly
-- No complex nested CTEs
-- Easy to understand and debug
------------------------------------------------------