diff --git a/CLAUDE.md b/CLAUDE.md index 0bad192..26b7ded 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -19,7 +19,7 @@ Dataflow is a simple data transformation tool for importing, cleaning, and stand ### Database Schema (`database/schema.sql`) **5 simple tables:** -- `sources` - Source definitions with `dedup_fields` array +- `sources` - Source definitions with `constraint_fields` array - `records` - Imported data with `data` (raw) and `transformed` (enriched) JSONB columns - `rules` - Regex extraction rules with `field`, `pattern`, `output_field` - `mappings` - Input/output value mappings @@ -123,9 +123,11 @@ records.data → apply_transformations() → ``` ### Deduplication -- Hash is MD5 of concatenated values from `dedup_fields` -- Unique constraint on `(source_name, dedup_key)` prevents duplicates -- Import function catches unique violations and counts them +- `constraint_key` is a JSONB object of the constraint field values (readable, no hashing) +- Dedup is enforced at import time via CTE — no unique DB constraint +- Intra-file duplicate rows are allowed (bank may send identical rows); they all insert +- On re-import, all rows whose constraint_key already exists in the DB are skipped +- Deleting an import log entry cascades to all records from that batch (import_id FK) ### Error Handling - API routes use `try/catch` and pass errors to `next(err)` @@ -184,7 +186,7 @@ The simplification makes it easy to understand, modify, and maintain. - Check for SQL errors in logs **All records marked as duplicates:** -- Verify `dedup_fields` match actual field names in data +- Verify `constraint_fields` match actual field names in data - Check if data was already imported - Use different source name for testing diff --git a/SPEC.md b/SPEC.md index 5e77ebb..f4e17d3 100644 --- a/SPEC.md +++ b/SPEC.md @@ -71,10 +71,10 @@ public/ — compiled UI (output of npm run build in ui/) Five tables in the `dataflow` schema: ### `sources` -Defines a data source. The `dedup_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view. +Defines a data source. The `constraint_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view. ### `records` -Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `dedup_key` is an MD5 hash of the dedup fields — a unique constraint on `(source_name, dedup_key)` prevents duplicate imports. +Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `constraint_key` is a JSONB object of the constraint field values used to detect duplicates at import time. `import_id` references the `import_log` row; deleting a log entry cascades to its records. ### `rules` Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it. @@ -83,7 +83,7 @@ Regex transformation rules. Each rule reads from `field`, applies `pattern` with Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`). ### `import_log` -Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. +Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. `info` (JSONB) stores the full `inserted_keys` and `excluded_keys` arrays. Deleting a log row cascades to its records via the `import_id` FK. --- @@ -92,8 +92,11 @@ Audit trail. One row per import call, recording how many records were inserted v ### Import ``` CSV file → parse in Node.js → import_records(source, data) - → generate_dedup_key() per record → INSERT with unique constraint - → count inserted vs duplicates → log to import_log + → build JSONB constraint_key per record + → compare against existing records (CTE — no unique constraint) + → INSERT new records, skip duplicates + → log to import_log (with inserted_keys / excluded_keys) + → apply_transformations() runs automatically on new records ``` ### Transform @@ -145,7 +148,7 @@ All routes are under `/api`. Every route requires HTTP Basic Auth. The `GET /hea | GET | /api/sources | List all sources | | POST | /api/sources | Create source | | GET | /api/sources/:name | Get source | -| PUT | /api/sources/:name | Update source (dedup_fields, config) | +| PUT | /api/sources/:name | Update source (constraint_fields, config) | | DELETE | /api/sources/:name | Delete source and all data | | POST | /api/sources/suggest | Suggest source config from CSV upload | | POST | /api/sources/:name/import | Import CSV records | @@ -203,9 +206,9 @@ Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The s **Pages:** -- **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields. +- **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are constraint fields and which appear in the output view. Supports CSV upload to auto-detect fields. -- **Import** — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import. +- **Import** — Upload a CSV to import records into the selected source. Transformations run automatically on new records. Shows import log with inserted/duplicate counts, expandable key detail, checkbox selection, and delete with confirmation. - **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle. @@ -213,6 +216,8 @@ Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The s - **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort. +- **Log** — Global import log across all sources. Same expandable key detail and delete capability as the Import page, plus a source name column. + --- ## manage.py diff --git a/api/routes/sources.js b/api/routes/sources.js index acc9082..49b0fc1 100644 --- a/api/routes/sources.js +++ b/api/routes/sources.js @@ -64,7 +64,7 @@ module.exports = (pool) => { return { name: key, type }; }); - res.json({ name: '', dedup_fields: [], fields }); + res.json({ name: '', constraint_fields: [], fields }); } catch (err) { next(err); } @@ -73,12 +73,12 @@ module.exports = (pool) => { // Create source router.post('/', async (req, res, next) => { try { - const { name, dedup_fields, config } = req.body; - if (!name || !dedup_fields || !Array.isArray(dedup_fields)) { - return res.status(400).json({ error: 'Missing required fields: name, dedup_fields (array)' }); + const { name, constraint_fields, config } = req.body; + if (!name || !constraint_fields || !Array.isArray(constraint_fields)) { + return res.status(400).json({ error: 'Missing required fields: name, constraint_fields (array)' }); } const result = await pool.query( - `SELECT * FROM create_source(${lit(name)}, ${arr(dedup_fields)}, ${lit(config || {})})` + `SELECT * FROM create_source(${lit(name)}, ${arr(constraint_fields)}, ${lit(config || {})})` ); res.status(201).json(result.rows[0]); } catch (err) { @@ -90,9 +90,9 @@ module.exports = (pool) => { // Update source router.put('/:name', async (req, res, next) => { try { - const { dedup_fields, config } = req.body; + const { constraint_fields, config } = req.body; const result = await pool.query( - `SELECT * FROM update_source(${lit(req.params.name)}, ${dedup_fields ? arr(dedup_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})` + `SELECT * FROM update_source(${lit(req.params.name)}, ${constraint_fields ? arr(constraint_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})` ); if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); res.json(result.rows[0]); @@ -122,6 +122,8 @@ module.exports = (pool) => { ); const importData = importResult.rows[0].result; + if (!importData.success) return res.json(importData); + const transformResult = await pool.query( `SELECT apply_transformations(${lit(req.params.name)}) as result` ); diff --git a/database/functions.sql b/database/functions.sql index 76a6d43..3eace82 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -14,17 +14,16 @@ CREATE OR REPLACE FUNCTION import_records( p_data JSONB -- Array of records ) RETURNS JSON AS $$ DECLARE - v_dedup_fields TEXT[]; - v_inserted INTEGER; - v_duplicates INTEGER; - v_log_id INTEGER; + v_constraint_fields TEXT[]; + v_inserted INTEGER; + v_duplicates INTEGER; + v_log_id INTEGER; BEGIN - -- Get dedup fields for this source - SELECT dedup_fields INTO v_dedup_fields + SELECT constraint_fields INTO v_constraint_fields FROM dataflow.sources WHERE name = p_source_name; - IF v_dedup_fields IS NULL THEN + IF v_constraint_fields IS NULL THEN RETURN json_build_object( 'success', false, 'error', 'Source not found: ' || p_source_name @@ -32,52 +31,49 @@ BEGIN END IF; WITH - -- All incoming records with their dedup keys and readable field values + -- All incoming records with their constraint keys pending AS ( SELECT - rec.value AS data, + rec.value AS data, rec.ordinality AS seq, - dataflow.generate_dedup_key(rec.value, v_dedup_fields) AS dedup_key, (SELECT jsonb_object_agg(f, rec.value->>f) - FROM unnest(v_dedup_fields) AS f) AS dedup_values + FROM unnest(v_constraint_fields) AS f) AS constraint_key FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec ), - -- Keys already in the database (excluded) with their readable values + -- Keys already in the database (excluded) existing AS ( - SELECT DISTINCT ON (r.dedup_key) r.dedup_key, - (SELECT jsonb_object_agg(f, r.data->>f) - FROM unnest(v_dedup_fields) AS f) AS dedup_values + SELECT DISTINCT r.constraint_key FROM dataflow.records r - INNER JOIN pending p ON p.dedup_key = r.dedup_key + INNER JOIN pending p ON p.constraint_key = r.constraint_key WHERE r.source_name = p_source_name ), - -- Keys that are new - new_keys AS ( - SELECT p.dedup_key, p.dedup_values FROM pending p - WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.dedup_key = p.dedup_key) + -- Rows whose constraint key is not yet in the database + new_records AS ( + SELECT p.data, p.constraint_key, p.seq + FROM pending p + WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.constraint_key = p.constraint_key) ), - -- Write the log entry with readable field values instead of hashes + -- Write the log entry log_entry AS ( INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info) VALUES ( p_source_name, - (SELECT count(*) FROM new_keys), - (SELECT count(*) FROM existing), + (SELECT count(*) FROM new_records), + (SELECT count(*) FROM pending) - (SELECT count(*) FROM new_records), jsonb_build_object( 'total', jsonb_array_length(p_data), - 'inserted_keys', (SELECT jsonb_agg(dedup_values) FROM new_keys), - 'excluded_keys', (SELECT jsonb_agg(dedup_values) FROM existing) + 'inserted_keys', (SELECT jsonb_agg(constraint_key ORDER BY constraint_key) FROM new_records), + 'excluded_keys', (SELECT jsonb_agg(constraint_key) FROM existing) ) ) RETURNING id, records_imported, records_duplicate ), - -- Insert only new records + -- Insert new records inserted AS ( - INSERT INTO dataflow.records (source_name, data, dedup_key, import_id) - SELECT p_source_name, p.data, p.dedup_key, (SELECT id FROM log_entry) - FROM pending p - INNER JOIN new_keys nk ON nk.dedup_key = p.dedup_key - ORDER BY p.seq + INSERT INTO dataflow.records (source_name, data, constraint_key, import_id) + SELECT p_source_name, nr.data, nr.constraint_key, (SELECT id FROM log_entry) + FROM new_records nr + ORDER BY nr.seq RETURNING id ) SELECT le.id, le.records_imported, le.records_duplicate diff --git a/database/migrate_tps.sql b/database/migrate_tps.sql index 2108e95..a4adf71 100644 --- a/database/migrate_tps.sql +++ b/database/migrate_tps.sql @@ -19,14 +19,14 @@ CREATE EXTENSION IF NOT EXISTS dblink; \echo '' \echo '=== 1. Sources ===' -INSERT INTO dataflow.sources (name, dedup_fields, config) +INSERT INTO dataflow.sources (name, constraint_fields, config) SELECT srce AS name, - -- Strip {} wrappers from constraint paths → dedup field names + -- Strip {} wrappers from constraint paths → constraint field names ARRAY( SELECT regexp_replace(c, '^\{|\}$', '', 'g') FROM jsonb_array_elements_text(defn->'constraint') AS c - ) AS dedup_fields, + ) AS constraint_fields, -- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others) jsonb_build_object('fields', (SELECT jsonb_agg( @@ -44,7 +44,7 @@ FROM dblink(:'tps_conn', ) AS t(srce TEXT, defn JSONB) ON CONFLICT (name) DO NOTHING; -SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count +SELECT name, constraint_fields, jsonb_array_length(config->'fields') AS field_count FROM dataflow.sources ORDER BY name; \echo '' @@ -95,11 +95,11 @@ FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rul \echo '=== 4. Records ===' \echo ' (13 000+ rows — may take a moment)' -INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at) +INSERT INTO dataflow.records (source_name, data, constraint_key, transformed, imported_at, transformed_at) SELECT t.srce AS source_name, t.rec AS data, - dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key, + (SELECT jsonb_object_agg(f, t.rec->>f) FROM unnest(s.constraint_fields) AS f) AS constraint_key, t.allj AS transformed, CURRENT_TIMESTAMP AS imported_at, CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at @@ -107,7 +107,7 @@ FROM dblink(:'tps_conn', 'SELECT srce, rec, allj FROM tps.trans' ) AS t(srce TEXT, rec JSONB, allj JSONB) JOIN dataflow.sources s ON s.name = t.srce -ON CONFLICT (source_name, dedup_key) DO NOTHING; +ON CONFLICT (source_name, constraint_key) DO NOTHING; SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed FROM dataflow.records GROUP BY source_name ORDER BY source_name; diff --git a/database/queries/sources.sql b/database/queries/sources.sql index ac78d04..6129932 100644 --- a/database/queries/sources.sql +++ b/database/queries/sources.sql @@ -17,17 +17,17 @@ RETURNS dataflow.sources AS $$ SELECT * FROM dataflow.sources WHERE name = p_name; $$ LANGUAGE sql STABLE; -CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_dedup_fields TEXT[], p_config JSONB DEFAULT '{}') +CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_constraint_fields TEXT[], p_config JSONB DEFAULT '{}') RETURNS dataflow.sources AS $$ - INSERT INTO dataflow.sources (name, dedup_fields, config) - VALUES (p_name, p_dedup_fields, p_config) + INSERT INTO dataflow.sources (name, constraint_fields, config) + VALUES (p_name, p_constraint_fields, p_config) RETURNING *; $$ LANGUAGE sql; -CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_dedup_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL) +CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_constraint_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL) RETURNS dataflow.sources AS $$ UPDATE dataflow.sources - SET dedup_fields = COALESCE(p_dedup_fields, dedup_fields), + SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields), config = COALESCE(p_config, config), updated_at = CURRENT_TIMESTAMP WHERE name = p_name @@ -129,30 +129,30 @@ BEGIN END; $$ LANGUAGE plpgsql STABLE; --- ── Import (deduplication) ──────────────────────────────────────────────────── +-- ── Import (uniqueness constraint) ──────────────────────────────────────────── CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB) RETURNS JSON AS $$ DECLARE - v_dedup_fields TEXT[]; + v_constraint_fields TEXT[]; v_record JSONB; - v_dedup_key TEXT; + v_constraint_key TEXT; v_inserted INTEGER := 0; v_duplicates INTEGER := 0; v_log_id INTEGER; BEGIN - SELECT dedup_fields INTO v_dedup_fields + SELECT constraint_fields INTO v_constraint_fields FROM dataflow.sources WHERE name = p_source_name; - IF v_dedup_fields IS NULL THEN + IF v_constraint_fields IS NULL THEN RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name); END IF; FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP - v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); + v_constraint_key := dataflow.generate_constraint_key(v_record, v_constraint_fields); BEGIN - INSERT INTO dataflow.records (source_name, data, dedup_key) - VALUES (p_source_name, v_record, v_dedup_key); + INSERT INTO dataflow.records (source_name, data, constraint_key) + VALUES (p_source_name, v_record, v_constraint_key); v_inserted := v_inserted + 1; EXCEPTION WHEN unique_violation THEN v_duplicates := v_duplicates + 1; diff --git a/database/schema.sql b/database/schema.sql index 7f35209..3b71239 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -15,14 +15,14 @@ SET search_path TO dataflow, public; ------------------------------------------------------ CREATE TABLE sources ( name TEXT PRIMARY KEY, - dedup_fields TEXT[] NOT NULL, -- Fields used for deduplication (e.g., ['date', 'amount', 'description']) + constraint_fields TEXT[] NOT NULL, -- Fields that uniquely identify a record (e.g., ['date', 'amount', 'description']) config JSONB DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE sources IS 'Data source definitions'; -COMMENT ON COLUMN sources.dedup_fields IS 'Array of field names used to identify duplicate records'; +COMMENT ON COLUMN sources.constraint_fields IS 'Array of field names that uniquely identify a record'; COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)'; ------------------------------------------------------ @@ -35,7 +35,7 @@ CREATE TABLE records ( -- Data data JSONB NOT NULL, -- Original imported data - dedup_key TEXT NOT NULL, -- Hash of dedup fields for fast lookup + constraint_key JSONB, -- Fields that uniquely identify this record (set on import) transformed JSONB, -- Data after transformations applied -- Metadata @@ -43,18 +43,17 @@ CREATE TABLE records ( imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, transformed_at TIMESTAMPTZ, - -- Constraints - UNIQUE(source_name, dedup_key) -- Prevent duplicates + ); COMMENT ON TABLE records IS 'Imported records with raw and transformed data'; COMMENT ON COLUMN records.data IS 'Original data as imported'; -COMMENT ON COLUMN records.dedup_key IS 'Hash of deduplication fields for fast duplicate detection'; +COMMENT ON COLUMN records.constraint_key IS 'JSONB object of constraint field values — uniquely identifies this record within its source'; COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules'; -- Indexes CREATE INDEX idx_records_source ON records(source_name); -CREATE INDEX idx_records_dedup ON records(source_name, dedup_key); +CREATE INDEX idx_records_constraint ON records USING gin(constraint_key); CREATE INDEX idx_records_data ON records USING gin(data); CREATE INDEX idx_records_transformed ON records USING gin(transformed); @@ -139,28 +138,6 @@ COMMENT ON COLUMN import_log.info IS 'Import details: inserted_keys and excluded CREATE INDEX idx_import_log_source ON import_log(source_name); CREATE INDEX idx_import_log_timestamp ON import_log(imported_at); ------------------------------------------------------- --- Helper function: Generate dedup key ------------------------------------------------------- -CREATE OR REPLACE FUNCTION generate_dedup_key( - data JSONB, - dedup_fields TEXT[] -) RETURNS TEXT AS $$ -DECLARE - field TEXT; - values TEXT := ''; -BEGIN - -- Concatenate values from dedup fields - FOREACH field IN ARRAY dedup_fields LOOP - values := values || COALESCE(data->>field, '') || '|'; - END LOOP; - - -- Return MD5 hash of concatenated values - RETURN md5(values); -END; -$$ LANGUAGE plpgsql IMMUTABLE; - -COMMENT ON FUNCTION generate_dedup_key IS 'Generate hash key from specified fields for deduplication'; ------------------------------------------------------ -- Summary diff --git a/examples/GETTING_STARTED.md b/examples/GETTING_STARTED.md index 66ba022..bf764d6 100644 --- a/examples/GETTING_STARTED.md +++ b/examples/GETTING_STARTED.md @@ -42,7 +42,7 @@ curl -X POST http://localhost:3000/api/sources \ -H "Content-Type: application/json" \ -d '{ "name": "bank_transactions", - "dedup_fields": ["date", "description", "amount"] + "constraint_fields": ["date", "description", "amount"] }' ``` @@ -303,7 +303,7 @@ curl -X POST http://localhost:3000/api/records/search \ **Import fails:** - Verify source exists: `curl http://localhost:3000/api/sources` - Check CSV format matches expectations -- Ensure dedup_fields match CSV column names +- Ensure constraint_fields match CSV column names **Transformations not working:** - Check rules exist: `curl http://localhost:3000/api/rules/source/bank_transactions` diff --git a/ui/src/pages/Import.jsx b/ui/src/pages/Import.jsx index 33a38a4..1416fff 100644 --- a/ui/src/pages/Import.jsx +++ b/ui/src/pages/Import.jsx @@ -196,8 +196,24 @@ export default function Import({ source }) { {error &&
{error}
} {result && ( -{result.error}
+ {result.duplicate_rows && ( +Offending rows:
+| Field | Type | -Dedup | +Constraint | s.trim()).includes(f.name)} + checked={form.constraint_fields.split(',').map(s => s.trim()).includes(f.name)} onChange={e => { - const current = form.dedup_fields.split(',').map(s => s.trim()).filter(Boolean) + const current = form.constraint_fields.split(',').map(s => s.trim()).filter(Boolean) const next = e.target.checked ? [...current, f.name] : current.filter(n => n !== f.name) - setForm(ff => ({ ...ff, dedup_fields: next.join(', ') })) + setForm(ff => ({ ...ff, constraint_fields: next.join(', ') })) }} /> | @@ -390,11 +390,11 @@ export default function Sources({ source, sources, setSources, setSource }) { {form.fields.length === 0 && (
|---|