-- -- Dataflow Functions -- Simple, clear functions for import and transformation -- SET search_path TO dataflow, public; ------------------------------------------------------ -- Function: import_records -- Import data with automatic deduplication ------------------------------------------------------ CREATE OR REPLACE FUNCTION import_records( p_source_name TEXT, p_data JSONB -- Array of records ) RETURNS JSON AS $$ DECLARE v_dedup_fields TEXT[]; v_record JSONB; v_dedup_key TEXT; v_inserted INTEGER := 0; v_duplicates INTEGER := 0; v_log_id INTEGER; BEGIN -- Get dedup fields for this source SELECT dedup_fields INTO v_dedup_fields FROM dataflow.sources WHERE name = p_source_name; IF v_dedup_fields IS NULL THEN RETURN json_build_object( 'success', false, 'error', 'Source not found: ' || p_source_name ); END IF; -- Process each record FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP -- Generate dedup key v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); -- Try to insert (will fail silently if duplicate) BEGIN INSERT INTO dataflow.records (source_name, data, dedup_key) VALUES (p_source_name, v_record, v_dedup_key); v_inserted := v_inserted + 1; EXCEPTION WHEN unique_violation THEN v_duplicates := v_duplicates + 1; END; END LOOP; -- Log the import INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) VALUES (p_source_name, v_inserted, v_duplicates) RETURNING id INTO v_log_id; RETURN json_build_object( 'success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; ------------------------------------------------------ -- Function: apply_transformations -- Apply all transformation rules to records ------------------------------------------------------ CREATE OR REPLACE FUNCTION apply_transformations( p_source_name TEXT, p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed ) RETURNS JSON AS $$ DECLARE v_record RECORD; v_rule RECORD; v_transformed JSONB; v_extracted TEXT; v_mapping JSONB; v_count INTEGER := 0; BEGIN -- Loop through records to transform FOR v_record IN SELECT id, data FROM dataflow.records WHERE source_name = p_source_name AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) AND transformed IS NULL LOOP -- Start with original data v_transformed := v_record.data; -- Apply each rule in sequence FOR v_rule IN SELECT * FROM dataflow.rules WHERE source_name = p_source_name AND enabled = true ORDER BY sequence LOOP -- Extract value using regex v_extracted := ( SELECT substring(v_record.data->>v_rule.field FROM v_rule.pattern) ); IF v_extracted IS NOT NULL THEN -- Check if there's a mapping for this value SELECT output INTO v_mapping FROM dataflow.mappings WHERE source_name = p_source_name AND rule_name = v_rule.name AND input_value = v_extracted; IF v_mapping IS NOT NULL THEN -- Apply mapping (merge mapped fields into result) v_transformed := v_transformed || v_mapping; ELSE -- No mapping, just add extracted value v_transformed := jsonb_set( v_transformed, ARRAY[v_rule.output_field], to_jsonb(v_extracted) ); END IF; END IF; END LOOP; -- Update record with transformed data UPDATE dataflow.records SET transformed = v_transformed, transformed_at = CURRENT_TIMESTAMP WHERE id = v_record.id; v_count := v_count + 1; END LOOP; RETURN json_build_object( 'success', true, 'transformed', v_count ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records'; ------------------------------------------------------ -- Function: get_unmapped_values -- Find extracted values that need mappings ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_unmapped_values( p_source_name TEXT, p_rule_name TEXT DEFAULT NULL ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, extracted_value TEXT, record_count BIGINT ) AS $$ BEGIN RETURN QUERY WITH extracted AS ( -- Get all transformed records and extract rule output fields SELECT r.name AS rule_name, r.output_field, rec.transformed->>r.output_field AS extracted_value FROM dataflow.records rec CROSS JOIN dataflow.rules r WHERE rec.source_name = p_source_name AND r.source_name = p_source_name AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) ) SELECT e.rule_name, e.output_field, e.extracted_value, count(*) AS record_count FROM extracted e WHERE NOT EXISTS ( -- Exclude values that already have mappings SELECT 1 FROM dataflow.mappings m WHERE m.source_name = p_source_name AND m.rule_name = e.rule_name AND m.input_value = e.extracted_value ) GROUP BY e.rule_name, e.output_field, e.extracted_value ORDER BY record_count DESC; END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined'; ------------------------------------------------------ -- Function: reprocess_records -- Clear and reapply transformations ------------------------------------------------------ CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) RETURNS JSON AS $$ BEGIN -- Clear existing transformations UPDATE dataflow.records SET transformed = NULL, transformed_at = NULL WHERE source_name = p_source_name; -- Reapply transformations RETURN dataflow.apply_transformations(p_source_name); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source'; ------------------------------------------------------ -- Summary ------------------------------------------------------ -- Functions: 4 simple, focused functions -- 1. import_records - Import with deduplication -- 2. apply_transformations - Apply rules and mappings -- 3. get_unmapped_values - Find values needing mappings -- 4. reprocess_records - Re-transform all records -- -- Each function does ONE thing clearly -- No complex nested CTEs -- Easy to understand and debug ------------------------------------------------------