- POST /api/sources/suggest: derive source definition from CSV upload - GET /api/sources/:name/import-log: query import history - GET /api/rules/:id/test: test rule pattern against real records - rules: add function_type (extract/replace) and flags columns - get_unmapped_values: include up to 3 sample records per value - npm start now uses nodemon for auto-reload Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
253 lines
8.2 KiB
PL/PgSQL
253 lines
8.2 KiB
PL/PgSQL
--
|
|
-- Dataflow Functions
|
|
-- Simple, clear functions for import and transformation
|
|
--
|
|
|
|
SET search_path TO dataflow, public;
|
|
|
|
------------------------------------------------------
|
|
-- Function: import_records
|
|
-- Import data with automatic deduplication
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION import_records(
|
|
p_source_name TEXT,
|
|
p_data JSONB -- Array of records
|
|
) RETURNS JSON AS $$
|
|
DECLARE
|
|
v_dedup_fields TEXT[];
|
|
v_record JSONB;
|
|
v_dedup_key TEXT;
|
|
v_inserted INTEGER := 0;
|
|
v_duplicates INTEGER := 0;
|
|
v_log_id INTEGER;
|
|
BEGIN
|
|
-- Get dedup fields for this source
|
|
SELECT dedup_fields INTO v_dedup_fields
|
|
FROM dataflow.sources
|
|
WHERE name = p_source_name;
|
|
|
|
IF v_dedup_fields IS NULL THEN
|
|
RETURN json_build_object(
|
|
'success', false,
|
|
'error', 'Source not found: ' || p_source_name
|
|
);
|
|
END IF;
|
|
|
|
-- Process each record
|
|
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data)
|
|
LOOP
|
|
-- Generate dedup key
|
|
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields);
|
|
|
|
-- Try to insert (will fail silently if duplicate)
|
|
BEGIN
|
|
INSERT INTO dataflow.records (source_name, data, dedup_key)
|
|
VALUES (p_source_name, v_record, v_dedup_key);
|
|
|
|
v_inserted := v_inserted + 1;
|
|
EXCEPTION WHEN unique_violation THEN
|
|
v_duplicates := v_duplicates + 1;
|
|
END;
|
|
END LOOP;
|
|
|
|
-- Log the import
|
|
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate)
|
|
VALUES (p_source_name, v_inserted, v_duplicates)
|
|
RETURNING id INTO v_log_id;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'imported', v_inserted,
|
|
'duplicates', v_duplicates,
|
|
'log_id', v_log_id
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
|
|
|
|
------------------------------------------------------
|
|
-- Function: apply_transformations
|
|
-- Apply all transformation rules to records
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION apply_transformations(
|
|
p_source_name TEXT,
|
|
p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed
|
|
) RETURNS JSON AS $$
|
|
DECLARE
|
|
v_record RECORD;
|
|
v_rule RECORD;
|
|
v_transformed JSONB;
|
|
v_extracted TEXT;
|
|
v_mapping JSONB;
|
|
v_count INTEGER := 0;
|
|
BEGIN
|
|
-- Loop through records to transform
|
|
FOR v_record IN
|
|
SELECT id, data
|
|
FROM dataflow.records
|
|
WHERE source_name = p_source_name
|
|
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
|
|
AND transformed IS NULL
|
|
LOOP
|
|
-- Start with original data
|
|
v_transformed := v_record.data;
|
|
|
|
-- Apply each rule in sequence
|
|
FOR v_rule IN
|
|
SELECT * FROM dataflow.rules
|
|
WHERE source_name = p_source_name
|
|
AND enabled = true
|
|
ORDER BY sequence
|
|
LOOP
|
|
-- Apply rule based on function type
|
|
IF v_rule.function_type = 'replace' THEN
|
|
v_extracted := regexp_replace(
|
|
v_record.data->>v_rule.field,
|
|
CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern,
|
|
v_rule.output_field
|
|
);
|
|
v_transformed := jsonb_set(
|
|
v_transformed,
|
|
ARRAY[v_rule.field],
|
|
to_jsonb(v_extracted)
|
|
);
|
|
ELSE
|
|
-- extract (default)
|
|
v_extracted := substring(
|
|
v_record.data->>v_rule.field
|
|
FROM CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern
|
|
);
|
|
|
|
IF v_extracted IS NOT NULL THEN
|
|
-- Check if there's a mapping for this value
|
|
SELECT output INTO v_mapping
|
|
FROM dataflow.mappings
|
|
WHERE source_name = p_source_name
|
|
AND rule_name = v_rule.name
|
|
AND input_value = v_extracted;
|
|
|
|
IF v_mapping IS NOT NULL THEN
|
|
-- Apply mapping (merge mapped fields into result)
|
|
v_transformed := v_transformed || v_mapping;
|
|
ELSE
|
|
-- No mapping, just add extracted value
|
|
v_transformed := jsonb_set(
|
|
v_transformed,
|
|
ARRAY[v_rule.output_field],
|
|
to_jsonb(v_extracted)
|
|
);
|
|
END IF;
|
|
END IF;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
-- Update record with transformed data
|
|
UPDATE dataflow.records
|
|
SET transformed = v_transformed,
|
|
transformed_at = CURRENT_TIMESTAMP
|
|
WHERE id = v_record.id;
|
|
|
|
v_count := v_count + 1;
|
|
END LOOP;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'transformed', v_count
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_unmapped_values
|
|
-- Find extracted values that need mappings
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_unmapped_values(
|
|
p_source_name TEXT,
|
|
p_rule_name TEXT DEFAULT NULL
|
|
) RETURNS TABLE (
|
|
rule_name TEXT,
|
|
output_field TEXT,
|
|
extracted_value TEXT,
|
|
record_count BIGINT,
|
|
sample_records JSONB
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
WITH extracted AS (
|
|
SELECT
|
|
r.name AS rule_name,
|
|
r.output_field,
|
|
rec.transformed->>r.output_field AS extracted_value,
|
|
rec.data AS raw_record
|
|
FROM
|
|
dataflow.records rec
|
|
CROSS JOIN dataflow.rules r
|
|
WHERE
|
|
rec.source_name = p_source_name
|
|
AND r.source_name = p_source_name
|
|
AND rec.transformed IS NOT NULL
|
|
AND rec.transformed ? r.output_field
|
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
|
)
|
|
SELECT
|
|
e.rule_name,
|
|
e.output_field,
|
|
e.extracted_value,
|
|
count(*) AS record_count,
|
|
jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records
|
|
FROM (
|
|
SELECT rule_name, output_field, extracted_value, raw_record,
|
|
row_number() OVER (PARTITION BY rule_name, extracted_value ORDER BY (SELECT NULL)) AS rn
|
|
FROM extracted
|
|
) e
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM dataflow.mappings m
|
|
WHERE m.source_name = p_source_name
|
|
AND m.rule_name = e.rule_name
|
|
AND m.input_value = e.extracted_value
|
|
)
|
|
AND e.rn <= 3
|
|
GROUP BY e.rule_name, e.output_field, e.extracted_value
|
|
ORDER BY count(*) DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
|
|
|
|
------------------------------------------------------
|
|
-- Function: reprocess_records
|
|
-- Clear and reapply transformations
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
BEGIN
|
|
-- Clear existing transformations
|
|
UPDATE dataflow.records
|
|
SET transformed = NULL,
|
|
transformed_at = NULL
|
|
WHERE source_name = p_source_name;
|
|
|
|
-- Reapply transformations
|
|
RETURN dataflow.apply_transformations(p_source_name);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
|
|
|
|
------------------------------------------------------
|
|
-- Summary
|
|
------------------------------------------------------
|
|
-- Functions: 4 simple, focused functions
|
|
-- 1. import_records - Import with deduplication
|
|
-- 2. apply_transformations - Apply rules and mappings
|
|
-- 3. get_unmapped_values - Find values needing mappings
|
|
-- 4. reprocess_records - Re-transform all records
|
|
--
|
|
-- Each function does ONE thing clearly
|
|
-- No complex nested CTEs
|
|
-- Easy to understand and debug
|
|
------------------------------------------------------
|