diff --git a/api/routes/sources.js b/api/routes/sources.js index 690ffda..587c56e 100644 --- a/api/routes/sources.js +++ b/api/routes/sources.js @@ -52,19 +52,22 @@ module.exports = (pool) => { const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true }); if (records.length === 0) return res.status(400).json({ error: 'CSV file is empty' }); + const ISO_DATE_RE = /^\d{4}-\d{2}-\d{2}(T[\d:.Z+-]+)?$/; const sample = records[0]; + const sampleRows = records.slice(0, 50); + const fields = Object.keys(sample).map(key => { - const val = sample[key]; + const vals = sampleRows.map(r => r[key]).filter(v => v !== '' && v != null); let type = 'text'; - if (!isNaN(parseFloat(val)) && isFinite(val) && val.charAt(0) !== '0') { + if (vals.length > 0 && vals.every(v => !isNaN(parseFloat(v)) && isFinite(v) && String(v).charAt(0) !== '0')) { type = 'numeric'; - } else if (Date.parse(val) > Date.parse('1950-01-01') && Date.parse(val) < Date.parse('2050-01-01')) { + } else if (vals.length > 0 && vals.every(v => ISO_DATE_RE.test(String(v)))) { type = 'date'; } return { name: key, type }; }); - res.json({ name: '', constraint_fields: [], fields }); + res.json({ name: '', constraint_fields: [], fields, sampleRows }); } catch (err) { next(err); } diff --git a/database/queries/sources.sql b/database/queries/sources.sql index 9b6f354..f357a58 100644 --- a/database/queries/sources.sql +++ b/database/queries/sources.sql @@ -144,145 +144,6 @@ BEGIN END; $$ LANGUAGE plpgsql STABLE; --- ── Import (uniqueness constraint) ──────────────────────────────────────────── - -CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB) -RETURNS JSON AS $$ -DECLARE - v_constraint_fields TEXT[]; - v_record JSONB; - v_constraint_key TEXT; - v_inserted INTEGER := 0; - v_duplicates INTEGER := 0; - v_log_id INTEGER; -BEGIN - SELECT constraint_fields INTO v_constraint_fields - FROM dataflow.sources WHERE name = p_source_name; - - IF v_constraint_fields IS NULL THEN - RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name); - END IF; - - FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP - v_constraint_key := dataflow.generate_constraint_key(v_record, v_constraint_fields); - BEGIN - INSERT INTO dataflow.records (source_name, data, constraint_key) - VALUES (p_source_name, v_record, v_constraint_key); - v_inserted := v_inserted + 1; - EXCEPTION WHEN unique_violation THEN - v_duplicates := v_duplicates + 1; - END; - END LOOP; - - INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) - VALUES (p_source_name, v_inserted, v_duplicates) - RETURNING id INTO v_log_id; - - RETURN json_build_object('success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id); -END; -$$ LANGUAGE plpgsql; - --- ── Transformations ─────────────────────────────────────────────────────────── - -CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) -RETURNS JSONB AS $$ - SELECT COALESCE(a, '{}') || COALESCE(b, '{}') -$$ LANGUAGE sql IMMUTABLE; - -DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); -CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( - sfunc = dataflow.jsonb_merge, - stype = JSONB, - initcond = '{}' -); - -DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]); -CREATE OR REPLACE FUNCTION apply_transformations( - p_source_name TEXT, - p_record_ids INTEGER[] DEFAULT NULL, - p_overwrite BOOLEAN DEFAULT FALSE -) RETURNS JSON AS $$ -WITH -qualifying AS ( - SELECT id, data FROM dataflow.records - WHERE source_name = p_source_name - AND (p_overwrite OR transformed IS NULL) - AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) -), -rx AS ( - SELECT - q.id, - r.name AS rule_name, - r.sequence, - r.output_field, - r.retain, - r.function_type, - COALESCE(mt.rn, rp.rn, 1) AS result_number, - CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val, - to_jsonb(rp.rp) AS replace_val - FROM dataflow.rules r - INNER JOIN qualifying q ON q.data ? r.field - LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags) - WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract' - LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags) - WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace' - WHERE r.source_name = p_source_name AND r.enabled = true -), -agg_matches AS ( - SELECT - id, rule_name, sequence, output_field, retain, function_type, - CASE function_type - WHEN 'replace' THEN jsonb_agg(replace_val) -> 0 - ELSE - CASE WHEN max(result_number) = 1 - THEN jsonb_agg(match_val ORDER BY result_number) -> 0 - ELSE jsonb_agg(match_val ORDER BY result_number) - END - END AS extracted - FROM rx - GROUP BY id, rule_name, sequence, output_field, retain, function_type -), -linked AS ( - SELECT - a.id, a.sequence, a.output_field, a.retain, a.extracted, m.output AS mapped - FROM agg_matches a - LEFT JOIN dataflow.mappings m ON - m.source_name = p_source_name - AND m.rule_name = a.rule_name - AND m.input_value = a.extracted - WHERE a.extracted IS NOT NULL -), -rule_output AS ( - SELECT - id, sequence, - CASE - WHEN mapped IS NOT NULL THEN - mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END - ELSE jsonb_build_object(output_field, extracted) - END AS output - FROM linked -), -record_additions AS ( - SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions - FROM rule_output GROUP BY id -), -updated AS ( - UPDATE dataflow.records rec - SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb), - transformed_at = CURRENT_TIMESTAMP - FROM qualifying q - LEFT JOIN record_additions ra ON ra.id = q.id - WHERE rec.id = q.id - RETURNING rec.id -) -SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated -$$ LANGUAGE sql; - -CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) -RETURNS JSON AS $$ - SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE) -$$ LANGUAGE sql; - -- ── View generation ─────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT) diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 6ca7bd4..716572c 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -8,6 +8,7 @@ import Rules from './pages/Rules' import Mappings from './pages/Mappings' import Records from './pages/Records' import Log from './pages/Log' +import Pivot from './pages/Pivot' const NAV = [ { to: '/sources', label: 'Sources' }, @@ -15,6 +16,7 @@ const NAV = [ { to: '/rules', label: 'Rules' }, { to: '/mappings', label: 'Mappings' }, { to: '/records', label: 'Records' }, + { to: '/pivot', label: 'Pivot' }, { to: '/log', label: 'Log' }, ] @@ -77,7 +79,7 @@ export default function App() {