diff --git a/README.md b/README.md index cc65aab..9264684 100644 --- a/README.md +++ b/README.md @@ -106,9 +106,10 @@ All `/api` routes require HTTP Basic authentication. | PUT | `/api/sources/:name` | Update a source | | DELETE | `/api/sources/:name` | Delete a source | | POST | `/api/sources/suggest` | Suggest source definition from CSV upload | -| POST | `/api/sources/:name/import` | Import CSV data | -| GET | `/api/sources/:name/import-log` | View import history | -| POST | `/api/sources/:name/transform` | Apply rules and mappings to records | +| POST | `/api/sources/:name/import` | Import CSV data and auto-apply transformations to new records | +| GET | `/api/sources/:name/import-log` | View import history (includes `inserted_keys` / `excluded_keys` in `info`) | +| DELETE | `/api/sources/:name/import-log/:id` | Delete an import batch and all its records | +| POST | `/api/sources/:name/transform` | Apply rules and mappings to any untransformed records | | POST | `/api/sources/:name/reprocess` | Re-transform all records | | GET | `/api/sources/:name/fields` | List all known field names | | GET | `/api/sources/:name/stats` | Get record and mapping counts | @@ -157,14 +158,13 @@ All `/api` routes require HTTP Basic authentication. ``` 1. Create a source (POST /api/sources) -2. Import CSV data (POST /api/sources/:name/import) -3. Create transformation rules (POST /api/rules) +2. Create transformation rules (POST /api/rules) +3. Import CSV data (POST /api/sources/:name/import) — transformations applied automatically to new records 4. Preview rules against real data (GET /api/rules/preview) -5. Apply transformations (POST /api/sources/:name/transform) -6. Review unmapped values (GET /api/mappings/source/:name/unmapped) -7. Add mappings (POST /api/mappings or bulk import via TSV) -8. Reprocess to apply new mappings (POST /api/sources/:name/reprocess) -9. Query results (GET /api/sources/:name/view-data) +5. Review unmapped values (GET /api/mappings/source/:name/unmapped) +6. Add mappings (POST /api/mappings or bulk import via TSV) +7. Reprocess to apply new mappings (POST /api/sources/:name/reprocess) +8. Query results (GET /api/sources/:name/view-data) ``` See `examples/GETTING_STARTED.md` for a complete walkthrough with curl examples. diff --git a/api/routes/sources.js b/api/routes/sources.js index 261934d..acc9082 100644 --- a/api/routes/sources.js +++ b/api/routes/sources.js @@ -13,6 +13,16 @@ const upload = multer({ storage: multer.memoryStorage() }); module.exports = (pool) => { const router = express.Router(); + // Global import log (all sources) + router.get('/import-log', async (req, res, next) => { + try { + const result = await pool.query(`SELECT * FROM get_all_import_logs()`); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + // List all sources router.get('/', async (req, res, next) => { try { @@ -102,15 +112,22 @@ module.exports = (pool) => { } }); - // Import CSV data + // Import CSV data and apply transformations to new records router.post('/:name/import', upload.single('file'), async (req, res, next) => { try { if (!req.file) return res.status(400).json({ error: 'No file uploaded' }); const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true }); - const result = await pool.query( + const importResult = await pool.query( `SELECT import_records(${lit(req.params.name)}, ${lit(records)}) as result` ); - res.json(result.rows[0].result); + const importData = importResult.rows[0].result; + + const transformResult = await pool.query( + `SELECT apply_transformations(${lit(req.params.name)}) as result` + ); + const transformData = transformResult.rows[0].result; + + res.json({ ...importData, transform: transformData }); } catch (err) { next(err); } @@ -126,6 +143,20 @@ module.exports = (pool) => { } }); + // Delete an import (removes all records from that batch and the log entry) + router.delete('/:name/import-log/:id', async (req, res, next) => { + try { + const result = await pool.query( + `SELECT delete_import(${lit(parseInt(req.params.id))}) as result` + ); + const data = result.rows[0].result; + if (!data.success) return res.status(404).json(data); + res.json(data); + } catch (err) { + next(err); + } + }); + // Apply transformations router.post('/:name/transform', async (req, res, next) => { try { diff --git a/database/functions.sql b/database/functions.sql index 7ec4972..76a6d43 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -15,11 +15,9 @@ CREATE OR REPLACE FUNCTION import_records( ) RETURNS JSON AS $$ DECLARE v_dedup_fields TEXT[]; - v_record JSONB; - v_dedup_key TEXT; - v_inserted INTEGER := 0; - v_duplicates INTEGER := 0; - v_log_id INTEGER; + v_inserted INTEGER; + v_duplicates INTEGER; + v_log_id INTEGER; BEGIN -- Get dedup fields for this source SELECT dedup_fields INTO v_dedup_fields @@ -33,39 +31,139 @@ BEGIN ); END IF; - -- Process each record - FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) - LOOP - -- Generate dedup key - v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); - - -- Try to insert (will fail silently if duplicate) - BEGIN - INSERT INTO dataflow.records (source_name, data, dedup_key) - VALUES (p_source_name, v_record, v_dedup_key); - - v_inserted := v_inserted + 1; - EXCEPTION WHEN unique_violation THEN - v_duplicates := v_duplicates + 1; - END; - END LOOP; - - -- Log the import - INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) - VALUES (p_source_name, v_inserted, v_duplicates) - RETURNING id INTO v_log_id; + WITH + -- All incoming records with their dedup keys and readable field values + pending AS ( + SELECT + rec.value AS data, + rec.ordinality AS seq, + dataflow.generate_dedup_key(rec.value, v_dedup_fields) AS dedup_key, + (SELECT jsonb_object_agg(f, rec.value->>f) + FROM unnest(v_dedup_fields) AS f) AS dedup_values + FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec + ), + -- Keys already in the database (excluded) with their readable values + existing AS ( + SELECT DISTINCT ON (r.dedup_key) r.dedup_key, + (SELECT jsonb_object_agg(f, r.data->>f) + FROM unnest(v_dedup_fields) AS f) AS dedup_values + FROM dataflow.records r + INNER JOIN pending p ON p.dedup_key = r.dedup_key + WHERE r.source_name = p_source_name + ), + -- Keys that are new + new_keys AS ( + SELECT p.dedup_key, p.dedup_values FROM pending p + WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.dedup_key = p.dedup_key) + ), + -- Write the log entry with readable field values instead of hashes + log_entry AS ( + INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info) + VALUES ( + p_source_name, + (SELECT count(*) FROM new_keys), + (SELECT count(*) FROM existing), + jsonb_build_object( + 'total', jsonb_array_length(p_data), + 'inserted_keys', (SELECT jsonb_agg(dedup_values) FROM new_keys), + 'excluded_keys', (SELECT jsonb_agg(dedup_values) FROM existing) + ) + ) + RETURNING id, records_imported, records_duplicate + ), + -- Insert only new records + inserted AS ( + INSERT INTO dataflow.records (source_name, data, dedup_key, import_id) + SELECT p_source_name, p.data, p.dedup_key, (SELECT id FROM log_entry) + FROM pending p + INNER JOIN new_keys nk ON nk.dedup_key = p.dedup_key + ORDER BY p.seq + RETURNING id + ) + SELECT le.id, le.records_imported, le.records_duplicate + INTO v_log_id, v_inserted, v_duplicates + FROM log_entry le; RETURN json_build_object( - 'success', true, - 'imported', v_inserted, + 'success', true, + 'imported', v_inserted, 'duplicates', v_duplicates, - 'log_id', v_log_id + 'log_id', v_log_id ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; +------------------------------------------------------ +-- Function: get_import_log +-- Return import history for a source +------------------------------------------------------ +CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT) +RETURNS TABLE ( + id INTEGER, + source_name TEXT, + records_imported INTEGER, + records_duplicate INTEGER, + imported_at TIMESTAMPTZ, + info JSONB +) AS $$ + SELECT id, source_name, records_imported, records_duplicate, imported_at, info + FROM dataflow.import_log + WHERE source_name = p_source_name + ORDER BY imported_at DESC; +$$ LANGUAGE sql; + +COMMENT ON FUNCTION get_import_log IS 'Return import history for a source, newest first, including inserted/excluded key lists'; + +------------------------------------------------------ +-- Function: get_all_import_logs +-- Return import history across all sources +------------------------------------------------------ +CREATE OR REPLACE FUNCTION get_all_import_logs() +RETURNS TABLE ( + id INTEGER, + source_name TEXT, + records_imported INTEGER, + records_duplicate INTEGER, + imported_at TIMESTAMPTZ, + info JSONB +) AS $$ + SELECT id, source_name, records_imported, records_duplicate, imported_at, info + FROM dataflow.import_log + ORDER BY imported_at DESC; +$$ LANGUAGE sql; + +COMMENT ON FUNCTION get_all_import_logs IS 'Return import history across all sources, newest first'; + +------------------------------------------------------ +-- Function: delete_import +-- Delete all records from a specific import and remove the log entry +------------------------------------------------------ +CREATE OR REPLACE FUNCTION delete_import(p_log_id INTEGER) +RETURNS JSON AS $$ +DECLARE + v_deleted INTEGER; +BEGIN + IF NOT EXISTS (SELECT 1 FROM dataflow.import_log WHERE id = p_log_id) THEN + RETURN json_build_object('success', false, 'error', 'Import log entry not found'); + END IF; + + SELECT count(*) INTO v_deleted FROM dataflow.records WHERE import_id = p_log_id; + + -- Cascade handles deleting records via FK ON DELETE CASCADE + DELETE FROM dataflow.import_log WHERE id = p_log_id; + + RETURN json_build_object( + 'success', true, + 'records_deleted', v_deleted, + 'log_id', p_log_id + ); +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION delete_import IS 'Delete all records belonging to an import batch and remove the log entry'; + ------------------------------------------------------ -- Aggregate: jsonb_concat_obj -- Merge JSONB objects across rows (later rows win on key conflicts) diff --git a/database/schema.sql b/database/schema.sql index adca799..7f35209 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -39,6 +39,7 @@ CREATE TABLE records ( transformed JSONB, -- Data after transformations applied -- Metadata + import_id INTEGER REFERENCES import_log(id) ON DELETE CASCADE, -- Which import batch this came from imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, transformed_at TIMESTAMPTZ, @@ -128,10 +129,12 @@ CREATE TABLE import_log ( source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, records_imported INTEGER DEFAULT 0, records_duplicate INTEGER DEFAULT 0, - imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + info JSONB -- Full detail: inserted_keys, excluded_keys ); COMMENT ON TABLE import_log IS 'Audit log of data imports'; +COMMENT ON COLUMN import_log.info IS 'Import details: inserted_keys and excluded_keys arrays'; CREATE INDEX idx_import_log_source ON import_log(source_name); CREATE INDEX idx_import_log_timestamp ON import_log(imported_at); diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 4fa31eb..6ca7bd4 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -7,6 +7,7 @@ import Import from './pages/Import' import Rules from './pages/Rules' import Mappings from './pages/Mappings' import Records from './pages/Records' +import Log from './pages/Log' const NAV = [ { to: '/sources', label: 'Sources' }, @@ -14,6 +15,7 @@ const NAV = [ { to: '/rules', label: 'Rules' }, { to: '/mappings', label: 'Mappings' }, { to: '/records', label: 'Records' }, + { to: '/log', label: 'Log' }, ] export default function App() { @@ -141,6 +143,7 @@ export default function App() { } /> } /> } /> + } /> diff --git a/ui/src/api.js b/ui/src/api.js index d657c31..4839480 100644 --- a/ui/src/api.js +++ b/ui/src/api.js @@ -53,6 +53,8 @@ export const api = { return request('POST', '/sources/suggest', fd, true) }, getImportLog: (name) => request('GET', `/sources/${name}/import-log`), + getAllImportLog: () => request('GET', '/sources/import-log'), + deleteImport: (name, id) => request('DELETE', `/sources/${name}/import-log/${id}`), getStats: (name) => request('GET', `/sources/${name}/stats`), importCSV: (name, file) => { const fd = new FormData() diff --git a/ui/src/pages/Import.jsx b/ui/src/pages/Import.jsx index 2ffb820..33a38a4 100644 --- a/ui/src/pages/Import.jsx +++ b/ui/src/pages/Import.jsx @@ -1,6 +1,64 @@ import { useState, useEffect, useRef } from 'react' import { api } from '../api' +function KeyList({ keys, label, color }) { + if (!keys || keys.length === 0) return null + return ( +
+
{label} ({keys.length})
+
+ {keys.map((k, i) => ( +
+ {typeof k === 'object' && k !== null + ? Object.entries(k).map(([field, val]) => `${field}: ${val}`).join(' · ') + : k} +
+ ))} +
+
+ ) +} + +function LogRow({ entry, selected, onToggle }) { + const [expanded, setExpanded] = useState(false) + const info = entry.info || {} + const insertedKeys = info.inserted_keys || [] + const excludedKeys = info.excluded_keys || [] + const hasKeys = insertedKeys.length > 0 || excludedKeys.length > 0 + + return ( + <> + + + + + {entry.id} + {new Date(entry.imported_at).toLocaleString()} + {entry.records_imported} + {entry.records_duplicate} + + {hasKeys && ( + + )} + + + {expanded && ( + + + + + + + )} + + ) +} + export default function Import({ source }) { const [stats, setStats] = useState(null) const [log, setLog] = useState([]) @@ -8,12 +66,14 @@ export default function Import({ source }) { const [loading, setLoading] = useState(false) const [error, setError] = useState('') const [dragOver, setDragOver] = useState(false) + const [selected, setSelected] = useState(new Set()) const fileRef = useRef() useEffect(() => { if (!source) return api.getStats(source).then(setStats).catch(() => {}) api.getImportLog(source).then(setLog).catch(() => {}) + setSelected(new Set()) }, [source]) async function handleImport(file) { @@ -47,6 +107,32 @@ export default function Import({ source }) { } } + function toggleSelect(id) { + setSelected(prev => { + const next = new Set(prev) + next.has(id) ? next.delete(id) : next.add(id) + return next + }) + } + + async function handleDeleteSelected() { + if (selected.size === 0) return + const plural = selected.size === 1 ? 'import' : 'imports' + if (!confirm(`Delete ${selected.size} ${plural}? This will permanently remove all records from those batches.`)) return + setLoading(true) + try { + await Promise.all([...selected].map(id => api.deleteImport(source, id))) + const [newLog, newStats] = await Promise.all([api.getImportLog(source), api.getStats(source)]) + setLog(newLog) + setStats(newStats) + setSelected(new Set()) + } catch (err) { + setError(err.message) + } finally { + setLoading(false) + } + } + async function handleReprocess() { if (!confirm('Reprocess all records? This will clear and reapply all transformation rules.')) return setLoading(true) @@ -111,11 +197,17 @@ export default function Import({ source }) { {result && (
- {result.success !== undefined ? ( + {result.imported !== undefined ? ( <> {result.imported} imported · {result.duplicates} duplicates skipped + {result.transform && ( + <> + · + {result.transform.transformed} transformed + + )} ) : ( {result.transformed} records transformed @@ -142,22 +234,37 @@ export default function Import({ source }) { {/* Import log */} {log.length > 0 && (
-

Import history

+
+

Import history

+ {selected.size > 0 && ( + + )} +
+ + + {log.map(entry => ( - - - - - + toggleSelect(entry.id)} + /> ))}
ID Date Imported Duplicates
{new Date(entry.imported_at).toLocaleString()}{entry.records_imported}{entry.records_duplicate}
diff --git a/ui/src/pages/Log.jsx b/ui/src/pages/Log.jsx new file mode 100644 index 0000000..882eb6d --- /dev/null +++ b/ui/src/pages/Log.jsx @@ -0,0 +1,100 @@ +import { useState, useEffect } from 'react' +import { api } from '../api' + +function KeyList({ keys, label, color }) { + if (!keys || keys.length === 0) return null + return ( +
+
{label} ({keys.length})
+
+ {keys.map((k, i) => ( +
+ {typeof k === 'object' && k !== null + ? Object.entries(k).map(([field, val]) => `${field}: ${val}`).join(' · ') + : k} +
+ ))} +
+
+ ) +} + +function LogRow({ entry }) { + const [expanded, setExpanded] = useState(false) + const info = entry.info || {} + const insertedKeys = info.inserted_keys || [] + const excludedKeys = info.excluded_keys || [] + const hasKeys = insertedKeys.length > 0 || excludedKeys.length > 0 + + return ( + <> + + {entry.id} + {entry.source_name} + {new Date(entry.imported_at).toLocaleString()} + {entry.records_imported} + {entry.records_duplicate} + + {hasKeys && ( + + )} + + + {expanded && ( + + + + + + + )} + + ) +} + +export default function Log() { + const [log, setLog] = useState([]) + const [loading, setLoading] = useState(true) + + useEffect(() => { + api.getAllImportLog() + .then(setLog) + .catch(() => {}) + .finally(() => setLoading(false)) + }, []) + + return ( +
+

Import Log

+ + {loading &&

Loading…

} + + {!loading && log.length === 0 && ( +

No imports yet.

+ )} + + {log.length > 0 && ( + + + + + + + + + + + + + {log.map(entry => )} + +
IDSourceDateImportedDuplicates
+ )} +
+ ) +}