diff --git a/api/routes/mappings.js b/api/routes/mappings.js index 5831516..b01fc00 100644 --- a/api/routes/mappings.js +++ b/api/routes/mappings.js @@ -79,7 +79,7 @@ module.exports = (pool) => { `INSERT INTO mappings (source_name, rule_name, input_value, output) VALUES ($1, $2, $3, $4) RETURNING *`, - [source_name, rule_name, input_value, JSON.stringify(output)] + [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] ); res.status(201).json(result.rows[0]); @@ -116,7 +116,7 @@ module.exports = (pool) => { ON CONFLICT (source_name, rule_name, input_value) DO UPDATE SET output = EXCLUDED.output RETURNING *`, - [source_name, rule_name, input_value, JSON.stringify(output)] + [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] ); results.push(result.rows[0]); diff --git a/api/routes/rules.js b/api/routes/rules.js index 9d884a2..4dfbcdd 100644 --- a/api/routes/rules.js +++ b/api/routes/rules.js @@ -21,6 +21,49 @@ module.exports = (pool) => { } }); + // Preview an ad-hoc pattern against real records (no saved rule needed) + router.get('/preview', async (req, res, next) => { + try { + const { source, field, pattern, flags, function_type = 'extract', replace_value = '', limit = 20 } = req.query; + + if (!source || !field || !pattern) { + return res.status(400).json({ error: 'source, field, and pattern are required' }); + } + + const fullPattern = (flags ? `(?${flags})` : '') + pattern; + + const query = function_type === 'replace' + ? `SELECT + id, + data->>$1 AS raw_value, + to_jsonb(regexp_replace(data->>$1, $2, $3)) AS extracted_value + FROM records + WHERE source_name = $4 AND data ? $1 + ORDER BY id DESC LIMIT $5` + : `SELECT + r.id, + r.data->>$1 AS raw_value, + CASE + WHEN m.match IS NULL THEN NULL + WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1]) + ELSE to_jsonb(m.match) + END AS extracted_value + FROM records r + CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m + WHERE r.source_name = $3 AND r.data ? $1 + ORDER BY r.id DESC LIMIT $4`; + + const params = function_type === 'replace' + ? [field, fullPattern, replace_value, source, parseInt(limit)] + : [field, fullPattern, source, parseInt(limit)]; + + const result = await pool.query(query, params); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + // Test a rule against real records router.get('/:id/test', async (req, res, next) => { try { @@ -40,13 +83,18 @@ module.exports = (pool) => { const pattern = (rule.flags ? `(?${rule.flags})` : '') + rule.pattern; const result = await pool.query( `SELECT - id, - data->>$1 AS raw_value, - substring(data->>$1 FROM $2) AS extracted_value - FROM records - WHERE source_name = $3 - AND data ? $1 - ORDER BY id DESC + r.id, + r.data->>$1 AS raw_value, + CASE + WHEN m.match IS NULL THEN NULL + WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1]) + ELSE to_jsonb(m.match) + END AS extracted_value + FROM records r + CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m + WHERE r.source_name = $3 + AND r.data ? $1 + ORDER BY r.id DESC LIMIT $4`, [rule.field, pattern, rule.source_name, parseInt(limit)] ); @@ -81,7 +129,7 @@ module.exports = (pool) => { // Create rule router.post('/', async (req, res, next) => { try { - const { source_name, name, field, pattern, output_field, function_type, flags, enabled, sequence } = req.body; + const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body; if (!source_name || !name || !field || !pattern || !output_field) { return res.status(400).json({ @@ -94,10 +142,10 @@ module.exports = (pool) => { } const result = await pool.query( - `INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, enabled, sequence) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + `INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *`, - [source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', enabled !== false, sequence || 0] + [source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, sequence || 0] ); res.status(201).json(result.rows[0]); @@ -115,7 +163,7 @@ module.exports = (pool) => { // Update rule router.put('/:id', async (req, res, next) => { try { - const { name, field, pattern, output_field, function_type, flags, enabled, sequence } = req.body; + const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body; if (function_type && !['extract', 'replace'].includes(function_type)) { return res.status(400).json({ error: 'function_type must be "extract" or "replace"' }); @@ -129,11 +177,12 @@ module.exports = (pool) => { output_field = COALESCE($5, output_field), function_type = COALESCE($6, function_type), flags = COALESCE($7, flags), - enabled = COALESCE($8, enabled), - sequence = COALESCE($9, sequence) + replace_value = COALESCE($8, replace_value), + enabled = COALESCE($9, enabled), + sequence = COALESCE($10, sequence) WHERE id = $1 RETURNING *`, - [req.params.id, name, field, pattern, output_field, function_type, flags, enabled, sequence] + [req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence] ); if (result.rows.length === 0) { diff --git a/api/routes/sources.js b/api/routes/sources.js index 1ef249d..e93ed18 100644 --- a/api/routes/sources.js +++ b/api/routes/sources.js @@ -282,5 +282,32 @@ module.exports = (pool) => { } }); + router.get('/:name/view-data', async (req, res, next) => { + try { + const { limit = 100, offset = 0 } = req.query; + const viewName = `dfv.${req.params.name}`; + + // Check view exists + const check = await pool.query( + `SELECT 1 FROM information_schema.views + WHERE table_schema = 'dfv' AND table_name = $1`, + [req.params.name] + ); + + if (check.rows.length === 0) { + return res.json({ exists: false, rows: [] }); + } + + const result = await pool.query( + `SELECT * FROM ${viewName} LIMIT $1 OFFSET $2`, + [parseInt(limit), parseInt(offset)] + ); + + res.json({ exists: true, rows: result.rows }); + } catch (err) { + next(err); + } + }); + return router; }; diff --git a/database/functions.sql b/database/functions.sql index 6f18fea..0b3053e 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -78,7 +78,8 @@ DECLARE v_record RECORD; v_rule RECORD; v_transformed JSONB; - v_extracted TEXT; + v_match TEXT[]; + v_extracted JSONB; v_mapping JSONB; v_count INTEGER := 0; BEGIN @@ -102,24 +103,30 @@ BEGIN LOOP -- Apply rule based on function type IF v_rule.function_type = 'replace' THEN - v_extracted := regexp_replace( - v_record.data->>v_rule.field, - CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern, - v_rule.output_field - ); v_transformed := jsonb_set( v_transformed, - ARRAY[v_rule.field], - to_jsonb(v_extracted) + ARRAY[v_rule.output_field], + to_jsonb(regexp_replace( + v_record.data->>v_rule.field, + CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern, + v_rule.replace_value + )) ); ELSE - -- extract (default) - v_extracted := substring( - v_record.data->>v_rule.field - FROM CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern + -- extract (default): regexp_match returns all capture groups as text[] + v_match := regexp_match( + v_record.data->>v_rule.field, + CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern ); - IF v_extracted IS NOT NULL THEN + IF v_match IS NOT NULL THEN + -- Single capture group → scalar string; multiple groups → JSON array + IF cardinality(v_match) = 1 THEN + v_extracted := to_jsonb(v_match[1]); + ELSE + v_extracted := to_jsonb(v_match); + END IF; + -- Check if there's a mapping for this value SELECT output INTO v_mapping FROM dataflow.mappings @@ -131,11 +138,11 @@ BEGIN -- Apply mapping (merge mapped fields into result) v_transformed := v_transformed || v_mapping; ELSE - -- No mapping, just add extracted value + -- No mapping, store extracted value (scalar or array) v_transformed := jsonb_set( v_transformed, ARRAY[v_rule.output_field], - to_jsonb(v_extracted) + v_extracted ); END IF; END IF; @@ -164,13 +171,14 @@ COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and map -- Function: get_unmapped_values -- Find extracted values that need mappings ------------------------------------------------------ -CREATE OR REPLACE FUNCTION get_unmapped_values( +DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT); +CREATE FUNCTION get_unmapped_values( p_source_name TEXT, p_rule_name TEXT DEFAULT NULL ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, - extracted_value TEXT, + extracted_value JSONB, record_count BIGINT, sample_records JSONB ) AS $$ @@ -180,7 +188,7 @@ BEGIN SELECT r.name AS rule_name, r.output_field, - rec.transformed->>r.output_field AS extracted_value, + rec.transformed->r.output_field AS extracted_value, rec.data AS raw_record FROM dataflow.records rec @@ -265,24 +273,44 @@ BEGIN LOOP IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; - CASE v_field->>'type' - WHEN 'date' THEN - v_cols := v_cols || format('(transformed->>%L)::date AS %I', - v_field->>'name', v_field->>'name'); - WHEN 'numeric' THEN - v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', - v_field->>'name', v_field->>'name'); - ELSE - v_cols := v_cols || format('transformed->>%L AS %I', - v_field->>'name', v_field->>'name'); - END CASE; + IF v_field->>'expression' IS NOT NULL THEN + -- Computed expression: substitute {fieldname} refs with (transformed->>'fieldname')::type + -- e.g. "{Amount} * {sign}" → "(transformed->>'Amount')::numeric * (transformed->>'sign')::numeric" + DECLARE + v_expr TEXT := v_field->>'expression'; + v_ref TEXT; + v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric'); + BEGIN + WHILE v_expr ~ '\{[^}]+\}' LOOP + v_ref := substring(v_expr FROM '\{([^}]+)\}'); + v_expr := replace(v_expr, '{' || v_ref || '}', + format('(transformed->>%L)::numeric', v_ref)); + END LOOP; + v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); + END; + ELSE + CASE v_field->>'type' + WHEN 'date' THEN + v_cols := v_cols || format('(transformed->>%L)::date AS %I', + v_field->>'name', v_field->>'name'); + WHEN 'numeric' THEN + v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', + v_field->>'name', v_field->>'name'); + ELSE + v_cols := v_cols || format('transformed->>%L AS %I', + v_field->>'name', v_field->>'name'); + END CASE; + END IF; END LOOP; CREATE SCHEMA IF NOT EXISTS dfv; v_view := 'dfv.' || quote_ident(p_source_name); - v_sql := format( - 'CREATE OR REPLACE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', + + EXECUTE format('DROP VIEW IF EXISTS %s', v_view); + + v_sql := format( + 'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', v_view, v_cols, p_source_name ); diff --git a/database/migrate_input_value_jsonb.sql b/database/migrate_input_value_jsonb.sql new file mode 100644 index 0000000..515770a --- /dev/null +++ b/database/migrate_input_value_jsonb.sql @@ -0,0 +1,22 @@ +-- +-- Migration: Change mappings.input_value from TEXT to JSONB +-- Allows multi-capture-group regex results to be used as mapping keys +-- + +SET search_path TO dataflow, public; + +-- Drop dependent constraint and index first +ALTER TABLE dataflow.mappings DROP CONSTRAINT mappings_source_name_rule_name_input_value_key; +DROP INDEX IF EXISTS dataflow.idx_mappings_input; + +-- Convert column: existing TEXT values become JSONB strings e.g. "MEIJER" +ALTER TABLE dataflow.mappings + ALTER COLUMN input_value TYPE JSONB + USING to_jsonb(input_value); + +-- Recreate constraint and index +ALTER TABLE dataflow.mappings + ADD CONSTRAINT mappings_source_name_rule_name_input_value_key + UNIQUE (source_name, rule_name, input_value); + +CREATE INDEX idx_mappings_input ON dataflow.mappings(source_name, rule_name, input_value); diff --git a/database/schema.sql b/database/schema.sql index 13634bb..68daff9 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -72,6 +72,7 @@ CREATE TABLE rules ( output_field TEXT NOT NULL, -- Name of extracted field (e.g., 'merchant') function_type TEXT NOT NULL DEFAULT 'extract', -- 'extract' or 'replace' flags TEXT NOT NULL DEFAULT '', -- Regex flags (e.g., 'i' for case-insensitive) + replace_value TEXT NOT NULL DEFAULT '', -- Replacement string (replace mode only) -- Options enabled BOOLEAN DEFAULT true, @@ -100,7 +101,7 @@ CREATE TABLE mappings ( rule_name TEXT NOT NULL, -- Mapping - input_value TEXT NOT NULL, -- Extracted value to match + input_value JSONB NOT NULL, -- Extracted value to match (string or array of capture groups) output JSONB NOT NULL, -- Standardized output -- Metadata diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 4b3a11c..d0b7239 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -41,7 +41,14 @@ export default function App() { {/* Source selector */}
- +
+ + {/* nav handled by link */}} + >+ +
setForm(f => ({ ...f, replace_value: e.target.value }))} + placeholder="e.g. leave blank to delete the match" + /> +
+ )} + {/* Live preview */} + {(preview.length > 0 || previewing) && ( +
+
+

+ {previewing ? 'Testing…' : `${preview.filter(r => r.extracted_value != null).length}/${preview.length} matched`} +

+ {!previewing && preview.length > 0 && ( + + )} +
+ {!previewing && ( + + + {preview.slice(0, 5).map((r, i) => ( + + + + + ))} + +
{r.raw_value} + {r.extracted_value != null + ? (Array.isArray(r.extracted_value) ? r.extracted_value.join(' · ') : String(r.extracted_value)) + : '—'} +
+ )} +
+ )} + + {modalOpen && setModalOpen(false)} />} + {error &&

{error}

}
- - -
- - - {editing === rule.id && ( -
- setEditing(null)} +
+ {rules.map(rule => { + const isExpanded = expanded === rule.id + return ( +
+ {/* Header — always visible, click to expand/collapse */} +
{ + if (isExpanded) { setExpanded(null); setEditing(null) } + else { setExpanded(rule.id); startEdit(rule) } + }} + > +
- )} - {testResults[rule.id] && ( -
-

Test results (last 20 records)

- - - - - - - - - {testResults[rule.id].slice(0, 10).map((r, i) => ( - - - - - ))} - -
Raw valueExtracted
{r.raw_value} - {r.extracted_value ?? '—'} -
-
- )} -
- ))} + {/* Expanded content */} + {isExpanded && ( +
+
+ +
+
+ handleSubmit(e, rule.id)} + onCancel={() => { setEditing(null); setExpanded(null) }} + /> +
+
+ )} +
+ ) + })}
) diff --git a/ui/src/pages/Sources.jsx b/ui/src/pages/Sources.jsx index b2beea8..9433516 100644 --- a/ui/src/pages/Sources.jsx +++ b/ui/src/pages/Sources.jsx @@ -3,22 +3,37 @@ import { api } from '../api' const FIELD_TYPES = ['text', 'numeric', 'date'] -function SourceDetail({ source, onClose, onDeleted, setSources, setSource }) { - const [dedup, setDedup] = useState(source.dedup_fields?.join(', ') || '') - const [schemaFields, setSchemaFields] = useState(source.config?.fields || []) +export default function Sources({ source, sources, setSources, setSource }) { + const [dedup, setDedup] = useState('') + const [schemaFields, setSchemaFields] = useState([]) const [stats, setStats] = useState(null) const [saving, setSaving] = useState(false) const [reprocessing, setReprocessing] = useState(false) const [generating, setGenerating] = useState(false) const [result, setResult] = useState('') const [error, setError] = useState('') - const [viewName, setViewName] = useState(source.config?.fields?.length ? `dfv.${source.name}` : '') + const [viewName, setViewName] = useState('') const [availableFields, setAvailableFields] = useState([]) + const [creating, setCreating] = useState(false) + const [form, setForm] = useState({ name: '', dedup_fields: '', fields: [], schema: [] }) + const [createError, setCreateError] = useState('') + const [createLoading, setCreateLoading] = useState(false) + const fileRef = useRef() + + const sourceObj = sources.find(s => s.name === source) useEffect(() => { - api.getStats(source.name).then(setStats).catch(() => {}) - api.getFields(source.name).then(setAvailableFields).catch(() => {}) - }, [source.name]) + if (!sourceObj) return + setDedup(sourceObj.dedup_fields?.join(', ') || '') + setSchemaFields((sourceObj.config?.fields || []).map((f, i) => ({ seq: i + 1, ...f }))) + setViewName(sourceObj.config?.fields?.length ? `dfv.${sourceObj.name}` : '') + setResult('') + setError('') + setStats(null) + setAvailableFields([]) + api.getStats(sourceObj.name).then(setStats).catch(() => {}) + api.getFields(sourceObj.name).then(setAvailableFields).catch(() => {}) + }, [source, sourceObj?.name]) async function handleSave(e) { e.preventDefault() @@ -26,8 +41,9 @@ function SourceDetail({ source, onClose, onDeleted, setSources, setSource }) { setError('') try { const dedup_fields = dedup.split(',').map(s => s.trim()).filter(Boolean) - const config = { ...(source.config || {}), fields: schemaFields.filter(f => f.name) } - await api.updateSource(source.name, { dedup_fields, config }) + const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0)) + const config = { ...(sourceObj.config || {}), fields } + await api.updateSource(sourceObj.name, { dedup_fields, config }) const updated = await api.getSources() setSources(updated) setResult('Saved.') @@ -43,11 +59,11 @@ function SourceDetail({ source, onClose, onDeleted, setSources, setSource }) { setResult('') setError('') try { - // Save schema first, then generate view from the saved config const dedup_fields = dedup.split(',').map(s => s.trim()).filter(Boolean) - const config = { ...(source.config || {}), fields: schemaFields.filter(f => f.name) } - await api.updateSource(source.name, { dedup_fields, config }) - const res = await api.generateView(source.name) + const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0)) + const config = { ...(sourceObj.config || {}), fields } + await api.updateSource(sourceObj.name, { dedup_fields, config }) + const res = await api.generateView(sourceObj.name) if (res.success) { setViewName(res.view) setResult(`View created: ${res.view}`) @@ -62,14 +78,14 @@ function SourceDetail({ source, onClose, onDeleted, setSources, setSource }) { } async function handleReprocess() { - if (!confirm(`Reprocess all records for "${source.name}"? This will clear and reapply all transformations.`)) return + if (!confirm(`Reprocess all records for "${sourceObj.name}"? This will clear and reapply all transformations.`)) return setReprocessing(true) setResult('') setError('') try { - const res = await api.reprocess(source.name) + const res = await api.reprocess(sourceObj.name) setResult(`Reprocessed ${res.transformed} records.`) - api.getStats(source.name).then(setStats).catch(() => {}) + api.getStats(sourceObj.name).then(setStats).catch(() => {}) } catch (err) { setError(err.message) } finally { @@ -77,157 +93,18 @@ function SourceDetail({ source, onClose, onDeleted, setSources, setSource }) { } } - return ( -
- {/* Stats */} - {stats && ( -
- {stats.total_records} total - {stats.transformed_records} transformed - {stats.pending_records} pending -
- )} - - {/* Unified field table */} - {availableFields.length > 0 && ( -
- - - - - - - - - - - - {availableFields.map(f => { - const isRaw = f.origins.includes('raw') - const dedupChecked = dedup.split(',').map(s => s.trim()).includes(f.key) - const schemaEntry = schemaFields.find(sf => sf.name === f.key) - const inView = !!schemaEntry - return ( - - - - - - - - ) - })} - -
KeyOriginTypeDedupIn view
{f.key}{f.origins.join(', ')} - {inView && ( - - )} - - {isRaw && ( - { - const current = dedup.split(',').map(s => s.trim()).filter(Boolean) - const next = e.target.checked - ? [...current, f.key] - : current.filter(k => k !== f.key) - setDedup(next.join(', ')) - }} - /> - )} - - { - if (e.target.checked) { - setSchemaFields(sf => [...sf, { name: f.key, type: 'text' }]) - } else { - setSchemaFields(sf => sf.filter(s => s.name !== f.key)) - } - }} - /> -
- -
-
- -
- {schemaFields.length > 0 && ( - <> - - {viewName && ( - {viewName} - )} - - )} -
-
- )} - - {/* Save button when no fields loaded yet */} - {availableFields.length === 0 && ( -
- -
- )} - - {/* Reprocess */} -
- - Clears and reruns all transformation rules -
- - {result &&

{result}

} - {error &&

{error}

} - -
- - -
-
- ) -} - -export default function Sources({ sources, setSources, setSource }) { - const [creating, setCreating] = useState(false) - const [expanded, setExpanded] = useState(null) - const [form, setForm] = useState({ name: '', dedup_fields: '', fields: [], schema: [] }) - const [error, setError] = useState('') - const [loading, setLoading] = useState(false) - const fileRef = useRef() + async function handleDelete() { + if (!confirm(`Delete source "${sourceObj.name}" and all its data?`)) return + try { + await api.deleteSource(sourceObj.name) + const updated = await api.getSources() + setSources(updated) + if (updated.length > 0) setSource(updated[0].name) + else setSource('') + } catch (err) { + alert(err.message) + } + } async function handleSuggest(e) { const file = e.target.files[0] @@ -241,93 +118,222 @@ export default function Sources({ sources, setSources, setSource }) { schema: suggestion.fields.map(f => ({ name: f.name, type: f.type })) })) } catch (err) { - setError(err.message) + setCreateError(err.message) } } async function handleCreate(e) { e.preventDefault() - setError('') - const dedup = form.dedup_fields.split(',').map(s => s.trim()).filter(Boolean) - if (!form.name || dedup.length === 0) { - setError('Name and at least one dedup field required') + setCreateError('') + const dedupArr = form.dedup_fields.split(',').map(s => s.trim()).filter(Boolean) + if (!form.name || dedupArr.length === 0) { + setCreateError('Name and at least one dedup field required') return } - setLoading(true) + setCreateLoading(true) try { const config = form.schema.length > 0 ? { fields: form.schema } : {} - await api.createSource({ name: form.name, dedup_fields: dedup, config }) + await api.createSource({ name: form.name, dedup_fields: dedupArr, config }) const updated = await api.getSources() setSources(updated) setSource(form.name) setForm({ name: '', dedup_fields: '', fields: [], schema: [] }) setCreating(false) } catch (err) { - setError(err.message) + setCreateError(err.message) } finally { - setLoading(false) - } - } - - async function handleDeleted(name) { - if (!confirm(`Delete source "${name}" and all its data?`)) return - try { - await api.deleteSource(name) - const updated = await api.getSources() - setSources(updated) - setExpanded(null) - if (updated.length > 0) setSource(updated[0].name) - } catch (err) { - alert(err.message) + setCreateLoading(false) } } return (
-

Sources

+

+ {sourceObj ? sourceObj.name : 'Sources'} +

- {sources.length === 0 && !creating && ( -

No sources yet. Create one to get started.

+ {/* No source selected */} + {!sourceObj && !creating && ( +

No sources yet. Create one to get started.

)} -
- {sources.map(s => ( -
-
setExpanded(expanded === s.name ? null : s.name)} - > -
- {s.name} - dedup: {s.dedup_fields?.join(', ')} -
- {expanded === s.name ? '▲' : '▼'} + {/* Source detail */} + {sourceObj && !creating && ( +
+ {/* Stats */} + {stats && ( +
+ {stats.total_records} total + {stats.transformed_records} transformed + {stats.pending_records} pending
+ )} - {expanded === s.name && ( - setExpanded(null)} - onDeleted={handleDeleted} - setSources={setSources} - setSource={setSource} - /> - )} + {/* Unified field table */} + {availableFields.length > 0 && ( +
+ + + + + + + + + + + + + {availableFields.map(f => { + const isRaw = f.origins.includes('raw') + const dedupChecked = dedup.split(',').map(s => s.trim()).includes(f.key) + const schemaEntry = schemaFields.find(sf => sf.name === f.key) + const inView = !!schemaEntry + return ( + + + + + + + + + ) + })} + +
KeyOriginTypeDedupIn viewSeq
{f.key}{f.origins.join(', ')} + {inView && ( +
+ + setSchemaFields(sf => + sf.map(s => s.name === f.key ? { ...s, expression: e.target.value || undefined } : s) + )} + /> +
+ )} +
+ {isRaw && ( + { + const current = dedup.split(',').map(s => s.trim()).filter(Boolean) + const next = e.target.checked + ? [...current, f.key] + : current.filter(k => k !== f.key) + setDedup(next.join(', ')) + }} + /> + )} + + { + if (e.target.checked) { + const nextSeq = schemaFields.length > 0 + ? Math.max(...schemaFields.map(s => s.seq ?? 0)) + 1 + : 1 + setSchemaFields(sf => [...sf, { name: f.key, type: 'text', seq: nextSeq }]) + } else { + setSchemaFields(sf => sf.filter(s => s.name !== f.key)) + } + }} + /> + + {inView && ( + setSchemaFields(sf => + sf.map(s => s.name === f.key ? { ...s, seq: parseInt(e.target.value) || 0 } : s) + )} + /> + )} +
+ +
+
+ +
+ {schemaFields.length > 0 && ( + <> + + {viewName && ( + {viewName} + )} + + )} +
+
+ )} + + {/* Save button when no fields loaded yet */} + {availableFields.length === 0 && ( +
+ +
+ )} + + {/* Reprocess */} +
+ + Clears and reruns all transformation rules
- ))} -
+ + {result &&

{result}

} + {error &&

{error}

} + +
+ +
+
+ )} {/* Create form */} {creating && ( -
+

New source

@@ -394,15 +400,15 @@ export default function Sources({ sources, setSources, setSource }) {
)} - {error &&

{error}

} + {createError &&

{createError}

}
-