Import log, constraint key overhaul, and dedup improvements

- Rename dedup_key/dedup_fields → constraint_key/constraint_fields everywhere
  (schema, functions, routes, UI, migration script, docs)
- Change constraint_key from MD5 TEXT hash to readable JSONB object
- Drop unique constraint on (source_name, constraint_key); dedup is now
  enforced at import time via CTE, allowing intra-file duplicate rows
- Add import_id FK (ON DELETE CASCADE) so deleting a log entry removes its records
- Add info JSONB to import_log with inserted_keys and excluded_keys arrays
- Add get_import_log, get_all_import_logs, delete_import SQL functions
- Auto-apply transformations immediately after import
- Import UI: expandable key detail, checkbox selection, delete with confirm,
  import ID column, transform result display
- New Log page: global import log across all sources

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-04-13 23:44:30 -04:00
parent b2a5e3c92a
commit d63d70cd52
10 changed files with 129 additions and 131 deletions

View File

@ -19,7 +19,7 @@ Dataflow is a simple data transformation tool for importing, cleaning, and stand
### Database Schema (`database/schema.sql`) ### Database Schema (`database/schema.sql`)
**5 simple tables:** **5 simple tables:**
- `sources` - Source definitions with `dedup_fields` array - `sources` - Source definitions with `constraint_fields` array
- `records` - Imported data with `data` (raw) and `transformed` (enriched) JSONB columns - `records` - Imported data with `data` (raw) and `transformed` (enriched) JSONB columns
- `rules` - Regex extraction rules with `field`, `pattern`, `output_field` - `rules` - Regex extraction rules with `field`, `pattern`, `output_field`
- `mappings` - Input/output value mappings - `mappings` - Input/output value mappings
@ -123,9 +123,11 @@ records.data → apply_transformations() →
``` ```
### Deduplication ### Deduplication
- Hash is MD5 of concatenated values from `dedup_fields` - `constraint_key` is a JSONB object of the constraint field values (readable, no hashing)
- Unique constraint on `(source_name, dedup_key)` prevents duplicates - Dedup is enforced at import time via CTE — no unique DB constraint
- Import function catches unique violations and counts them - Intra-file duplicate rows are allowed (bank may send identical rows); they all insert
- On re-import, all rows whose constraint_key already exists in the DB are skipped
- Deleting an import log entry cascades to all records from that batch (import_id FK)
### Error Handling ### Error Handling
- API routes use `try/catch` and pass errors to `next(err)` - API routes use `try/catch` and pass errors to `next(err)`
@ -184,7 +186,7 @@ The simplification makes it easy to understand, modify, and maintain.
- Check for SQL errors in logs - Check for SQL errors in logs
**All records marked as duplicates:** **All records marked as duplicates:**
- Verify `dedup_fields` match actual field names in data - Verify `constraint_fields` match actual field names in data
- Check if data was already imported - Check if data was already imported
- Use different source name for testing - Use different source name for testing

21
SPEC.md
View File

@ -71,10 +71,10 @@ public/ — compiled UI (output of npm run build in ui/)
Five tables in the `dataflow` schema: Five tables in the `dataflow` schema:
### `sources` ### `sources`
Defines a data source. The `dedup_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view. Defines a data source. The `constraint_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view.
### `records` ### `records`
Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `dedup_key` is an MD5 hash of the dedup fields — a unique constraint on `(source_name, dedup_key)` prevents duplicate imports. Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `constraint_key` is a JSONB object of the constraint field values used to detect duplicates at import time. `import_id` references the `import_log` row; deleting a log entry cascades to its records.
### `rules` ### `rules`
Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it. Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it.
@ -83,7 +83,7 @@ Regex transformation rules. Each rule reads from `field`, applies `pattern` with
Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`). Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`).
### `import_log` ### `import_log`
Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. `info` (JSONB) stores the full `inserted_keys` and `excluded_keys` arrays. Deleting a log row cascades to its records via the `import_id` FK.
--- ---
@ -92,8 +92,11 @@ Audit trail. One row per import call, recording how many records were inserted v
### Import ### Import
``` ```
CSV file → parse in Node.js → import_records(source, data) CSV file → parse in Node.js → import_records(source, data)
→ generate_dedup_key() per record → INSERT with unique constraint → build JSONB constraint_key per record
→ count inserted vs duplicates → log to import_log → compare against existing records (CTE — no unique constraint)
→ INSERT new records, skip duplicates
→ log to import_log (with inserted_keys / excluded_keys)
→ apply_transformations() runs automatically on new records
``` ```
### Transform ### Transform
@ -145,7 +148,7 @@ All routes are under `/api`. Every route requires HTTP Basic Auth. The `GET /hea
| GET | /api/sources | List all sources | | GET | /api/sources | List all sources |
| POST | /api/sources | Create source | | POST | /api/sources | Create source |
| GET | /api/sources/:name | Get source | | GET | /api/sources/:name | Get source |
| PUT | /api/sources/:name | Update source (dedup_fields, config) | | PUT | /api/sources/:name | Update source (constraint_fields, config) |
| DELETE | /api/sources/:name | Delete source and all data | | DELETE | /api/sources/:name | Delete source and all data |
| POST | /api/sources/suggest | Suggest source config from CSV upload | | POST | /api/sources/suggest | Suggest source config from CSV upload |
| POST | /api/sources/:name/import | Import CSV records | | POST | /api/sources/:name/import | Import CSV records |
@ -203,9 +206,9 @@ Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The s
**Pages:** **Pages:**
- **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields. - **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are constraint fields and which appear in the output view. Supports CSV upload to auto-detect fields.
- **Import** — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import. - **Import** — Upload a CSV to import records into the selected source. Transformations run automatically on new records. Shows import log with inserted/duplicate counts, expandable key detail, checkbox selection, and delete with confirmation.
- **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle. - **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle.
@ -213,6 +216,8 @@ Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The s
- **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort. - **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort.
- **Log** — Global import log across all sources. Same expandable key detail and delete capability as the Import page, plus a source name column.
--- ---
## manage.py ## manage.py

View File

@ -64,7 +64,7 @@ module.exports = (pool) => {
return { name: key, type }; return { name: key, type };
}); });
res.json({ name: '', dedup_fields: [], fields }); res.json({ name: '', constraint_fields: [], fields });
} catch (err) { } catch (err) {
next(err); next(err);
} }
@ -73,12 +73,12 @@ module.exports = (pool) => {
// Create source // Create source
router.post('/', async (req, res, next) => { router.post('/', async (req, res, next) => {
try { try {
const { name, dedup_fields, config } = req.body; const { name, constraint_fields, config } = req.body;
if (!name || !dedup_fields || !Array.isArray(dedup_fields)) { if (!name || !constraint_fields || !Array.isArray(constraint_fields)) {
return res.status(400).json({ error: 'Missing required fields: name, dedup_fields (array)' }); return res.status(400).json({ error: 'Missing required fields: name, constraint_fields (array)' });
} }
const result = await pool.query( const result = await pool.query(
`SELECT * FROM create_source(${lit(name)}, ${arr(dedup_fields)}, ${lit(config || {})})` `SELECT * FROM create_source(${lit(name)}, ${arr(constraint_fields)}, ${lit(config || {})})`
); );
res.status(201).json(result.rows[0]); res.status(201).json(result.rows[0]);
} catch (err) { } catch (err) {
@ -90,9 +90,9 @@ module.exports = (pool) => {
// Update source // Update source
router.put('/:name', async (req, res, next) => { router.put('/:name', async (req, res, next) => {
try { try {
const { dedup_fields, config } = req.body; const { constraint_fields, config } = req.body;
const result = await pool.query( const result = await pool.query(
`SELECT * FROM update_source(${lit(req.params.name)}, ${dedup_fields ? arr(dedup_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})` `SELECT * FROM update_source(${lit(req.params.name)}, ${constraint_fields ? arr(constraint_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})`
); );
if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
res.json(result.rows[0]); res.json(result.rows[0]);
@ -122,6 +122,8 @@ module.exports = (pool) => {
); );
const importData = importResult.rows[0].result; const importData = importResult.rows[0].result;
if (!importData.success) return res.json(importData);
const transformResult = await pool.query( const transformResult = await pool.query(
`SELECT apply_transformations(${lit(req.params.name)}) as result` `SELECT apply_transformations(${lit(req.params.name)}) as result`
); );

View File

@ -14,17 +14,16 @@ CREATE OR REPLACE FUNCTION import_records(
p_data JSONB -- Array of records p_data JSONB -- Array of records
) RETURNS JSON AS $$ ) RETURNS JSON AS $$
DECLARE DECLARE
v_dedup_fields TEXT[]; v_constraint_fields TEXT[];
v_inserted INTEGER; v_inserted INTEGER;
v_duplicates INTEGER; v_duplicates INTEGER;
v_log_id INTEGER; v_log_id INTEGER;
BEGIN BEGIN
-- Get dedup fields for this source SELECT constraint_fields INTO v_constraint_fields
SELECT dedup_fields INTO v_dedup_fields
FROM dataflow.sources FROM dataflow.sources
WHERE name = p_source_name; WHERE name = p_source_name;
IF v_dedup_fields IS NULL THEN IF v_constraint_fields IS NULL THEN
RETURN json_build_object( RETURN json_build_object(
'success', false, 'success', false,
'error', 'Source not found: ' || p_source_name 'error', 'Source not found: ' || p_source_name
@ -32,52 +31,49 @@ BEGIN
END IF; END IF;
WITH WITH
-- All incoming records with their dedup keys and readable field values -- All incoming records with their constraint keys
pending AS ( pending AS (
SELECT SELECT
rec.value AS data, rec.value AS data,
rec.ordinality AS seq, rec.ordinality AS seq,
dataflow.generate_dedup_key(rec.value, v_dedup_fields) AS dedup_key,
(SELECT jsonb_object_agg(f, rec.value->>f) (SELECT jsonb_object_agg(f, rec.value->>f)
FROM unnest(v_dedup_fields) AS f) AS dedup_values FROM unnest(v_constraint_fields) AS f) AS constraint_key
FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec
), ),
-- Keys already in the database (excluded) with their readable values -- Keys already in the database (excluded)
existing AS ( existing AS (
SELECT DISTINCT ON (r.dedup_key) r.dedup_key, SELECT DISTINCT r.constraint_key
(SELECT jsonb_object_agg(f, r.data->>f)
FROM unnest(v_dedup_fields) AS f) AS dedup_values
FROM dataflow.records r FROM dataflow.records r
INNER JOIN pending p ON p.dedup_key = r.dedup_key INNER JOIN pending p ON p.constraint_key = r.constraint_key
WHERE r.source_name = p_source_name WHERE r.source_name = p_source_name
), ),
-- Keys that are new -- Rows whose constraint key is not yet in the database
new_keys AS ( new_records AS (
SELECT p.dedup_key, p.dedup_values FROM pending p SELECT p.data, p.constraint_key, p.seq
WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.dedup_key = p.dedup_key) FROM pending p
WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.constraint_key = p.constraint_key)
), ),
-- Write the log entry with readable field values instead of hashes -- Write the log entry
log_entry AS ( log_entry AS (
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info) INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info)
VALUES ( VALUES (
p_source_name, p_source_name,
(SELECT count(*) FROM new_keys), (SELECT count(*) FROM new_records),
(SELECT count(*) FROM existing), (SELECT count(*) FROM pending) - (SELECT count(*) FROM new_records),
jsonb_build_object( jsonb_build_object(
'total', jsonb_array_length(p_data), 'total', jsonb_array_length(p_data),
'inserted_keys', (SELECT jsonb_agg(dedup_values) FROM new_keys), 'inserted_keys', (SELECT jsonb_agg(constraint_key ORDER BY constraint_key) FROM new_records),
'excluded_keys', (SELECT jsonb_agg(dedup_values) FROM existing) 'excluded_keys', (SELECT jsonb_agg(constraint_key) FROM existing)
) )
) )
RETURNING id, records_imported, records_duplicate RETURNING id, records_imported, records_duplicate
), ),
-- Insert only new records -- Insert new records
inserted AS ( inserted AS (
INSERT INTO dataflow.records (source_name, data, dedup_key, import_id) INSERT INTO dataflow.records (source_name, data, constraint_key, import_id)
SELECT p_source_name, p.data, p.dedup_key, (SELECT id FROM log_entry) SELECT p_source_name, nr.data, nr.constraint_key, (SELECT id FROM log_entry)
FROM pending p FROM new_records nr
INNER JOIN new_keys nk ON nk.dedup_key = p.dedup_key ORDER BY nr.seq
ORDER BY p.seq
RETURNING id RETURNING id
) )
SELECT le.id, le.records_imported, le.records_duplicate SELECT le.id, le.records_imported, le.records_duplicate

View File

@ -19,14 +19,14 @@ CREATE EXTENSION IF NOT EXISTS dblink;
\echo '' \echo ''
\echo '=== 1. Sources ===' \echo '=== 1. Sources ==='
INSERT INTO dataflow.sources (name, dedup_fields, config) INSERT INTO dataflow.sources (name, constraint_fields, config)
SELECT SELECT
srce AS name, srce AS name,
-- Strip {} wrappers from constraint paths → dedup field names -- Strip {} wrappers from constraint paths → constraint field names
ARRAY( ARRAY(
SELECT regexp_replace(c, '^\{|\}$', '', 'g') SELECT regexp_replace(c, '^\{|\}$', '', 'g')
FROM jsonb_array_elements_text(defn->'constraint') AS c FROM jsonb_array_elements_text(defn->'constraint') AS c
) AS dedup_fields, ) AS constraint_fields,
-- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others) -- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others)
jsonb_build_object('fields', jsonb_build_object('fields',
(SELECT jsonb_agg( (SELECT jsonb_agg(
@ -44,7 +44,7 @@ FROM dblink(:'tps_conn',
) AS t(srce TEXT, defn JSONB) ) AS t(srce TEXT, defn JSONB)
ON CONFLICT (name) DO NOTHING; ON CONFLICT (name) DO NOTHING;
SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count SELECT name, constraint_fields, jsonb_array_length(config->'fields') AS field_count
FROM dataflow.sources ORDER BY name; FROM dataflow.sources ORDER BY name;
\echo '' \echo ''
@ -95,11 +95,11 @@ FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rul
\echo '=== 4. Records ===' \echo '=== 4. Records ==='
\echo ' (13 000+ rows — may take a moment)' \echo ' (13 000+ rows — may take a moment)'
INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at) INSERT INTO dataflow.records (source_name, data, constraint_key, transformed, imported_at, transformed_at)
SELECT SELECT
t.srce AS source_name, t.srce AS source_name,
t.rec AS data, t.rec AS data,
dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key, (SELECT jsonb_object_agg(f, t.rec->>f) FROM unnest(s.constraint_fields) AS f) AS constraint_key,
t.allj AS transformed, t.allj AS transformed,
CURRENT_TIMESTAMP AS imported_at, CURRENT_TIMESTAMP AS imported_at,
CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at
@ -107,7 +107,7 @@ FROM dblink(:'tps_conn',
'SELECT srce, rec, allj FROM tps.trans' 'SELECT srce, rec, allj FROM tps.trans'
) AS t(srce TEXT, rec JSONB, allj JSONB) ) AS t(srce TEXT, rec JSONB, allj JSONB)
JOIN dataflow.sources s ON s.name = t.srce JOIN dataflow.sources s ON s.name = t.srce
ON CONFLICT (source_name, dedup_key) DO NOTHING; ON CONFLICT (source_name, constraint_key) DO NOTHING;
SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed
FROM dataflow.records GROUP BY source_name ORDER BY source_name; FROM dataflow.records GROUP BY source_name ORDER BY source_name;

View File

@ -17,17 +17,17 @@ RETURNS dataflow.sources AS $$
SELECT * FROM dataflow.sources WHERE name = p_name; SELECT * FROM dataflow.sources WHERE name = p_name;
$$ LANGUAGE sql STABLE; $$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_dedup_fields TEXT[], p_config JSONB DEFAULT '{}') CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_constraint_fields TEXT[], p_config JSONB DEFAULT '{}')
RETURNS dataflow.sources AS $$ RETURNS dataflow.sources AS $$
INSERT INTO dataflow.sources (name, dedup_fields, config) INSERT INTO dataflow.sources (name, constraint_fields, config)
VALUES (p_name, p_dedup_fields, p_config) VALUES (p_name, p_constraint_fields, p_config)
RETURNING *; RETURNING *;
$$ LANGUAGE sql; $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_dedup_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL) CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_constraint_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL)
RETURNS dataflow.sources AS $$ RETURNS dataflow.sources AS $$
UPDATE dataflow.sources UPDATE dataflow.sources
SET dedup_fields = COALESCE(p_dedup_fields, dedup_fields), SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
config = COALESCE(p_config, config), config = COALESCE(p_config, config),
updated_at = CURRENT_TIMESTAMP updated_at = CURRENT_TIMESTAMP
WHERE name = p_name WHERE name = p_name
@ -129,30 +129,30 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql STABLE; $$ LANGUAGE plpgsql STABLE;
-- ── Import (deduplication) ──────────────────────────────────────────────────── -- ── Import (uniqueness constraint) ────────────────────────────────────────────
CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB) CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB)
RETURNS JSON AS $$ RETURNS JSON AS $$
DECLARE DECLARE
v_dedup_fields TEXT[]; v_constraint_fields TEXT[];
v_record JSONB; v_record JSONB;
v_dedup_key TEXT; v_constraint_key TEXT;
v_inserted INTEGER := 0; v_inserted INTEGER := 0;
v_duplicates INTEGER := 0; v_duplicates INTEGER := 0;
v_log_id INTEGER; v_log_id INTEGER;
BEGIN BEGIN
SELECT dedup_fields INTO v_dedup_fields SELECT constraint_fields INTO v_constraint_fields
FROM dataflow.sources WHERE name = p_source_name; FROM dataflow.sources WHERE name = p_source_name;
IF v_dedup_fields IS NULL THEN IF v_constraint_fields IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name); RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name);
END IF; END IF;
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); v_constraint_key := dataflow.generate_constraint_key(v_record, v_constraint_fields);
BEGIN BEGIN
INSERT INTO dataflow.records (source_name, data, dedup_key) INSERT INTO dataflow.records (source_name, data, constraint_key)
VALUES (p_source_name, v_record, v_dedup_key); VALUES (p_source_name, v_record, v_constraint_key);
v_inserted := v_inserted + 1; v_inserted := v_inserted + 1;
EXCEPTION WHEN unique_violation THEN EXCEPTION WHEN unique_violation THEN
v_duplicates := v_duplicates + 1; v_duplicates := v_duplicates + 1;

View File

@ -15,14 +15,14 @@ SET search_path TO dataflow, public;
------------------------------------------------------ ------------------------------------------------------
CREATE TABLE sources ( CREATE TABLE sources (
name TEXT PRIMARY KEY, name TEXT PRIMARY KEY,
dedup_fields TEXT[] NOT NULL, -- Fields used for deduplication (e.g., ['date', 'amount', 'description']) constraint_fields TEXT[] NOT NULL, -- Fields that uniquely identify a record (e.g., ['date', 'amount', 'description'])
config JSONB DEFAULT '{}'::jsonb, config JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
); );
COMMENT ON TABLE sources IS 'Data source definitions'; COMMENT ON TABLE sources IS 'Data source definitions';
COMMENT ON COLUMN sources.dedup_fields IS 'Array of field names used to identify duplicate records'; COMMENT ON COLUMN sources.constraint_fields IS 'Array of field names that uniquely identify a record';
COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)'; COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)';
------------------------------------------------------ ------------------------------------------------------
@ -35,7 +35,7 @@ CREATE TABLE records (
-- Data -- Data
data JSONB NOT NULL, -- Original imported data data JSONB NOT NULL, -- Original imported data
dedup_key TEXT NOT NULL, -- Hash of dedup fields for fast lookup constraint_key JSONB, -- Fields that uniquely identify this record (set on import)
transformed JSONB, -- Data after transformations applied transformed JSONB, -- Data after transformations applied
-- Metadata -- Metadata
@ -43,18 +43,17 @@ CREATE TABLE records (
imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
transformed_at TIMESTAMPTZ, transformed_at TIMESTAMPTZ,
-- Constraints
UNIQUE(source_name, dedup_key) -- Prevent duplicates
); );
COMMENT ON TABLE records IS 'Imported records with raw and transformed data'; COMMENT ON TABLE records IS 'Imported records with raw and transformed data';
COMMENT ON COLUMN records.data IS 'Original data as imported'; COMMENT ON COLUMN records.data IS 'Original data as imported';
COMMENT ON COLUMN records.dedup_key IS 'Hash of deduplication fields for fast duplicate detection'; COMMENT ON COLUMN records.constraint_key IS 'JSONB object of constraint field values — uniquely identifies this record within its source';
COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules'; COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules';
-- Indexes -- Indexes
CREATE INDEX idx_records_source ON records(source_name); CREATE INDEX idx_records_source ON records(source_name);
CREATE INDEX idx_records_dedup ON records(source_name, dedup_key); CREATE INDEX idx_records_constraint ON records USING gin(constraint_key);
CREATE INDEX idx_records_data ON records USING gin(data); CREATE INDEX idx_records_data ON records USING gin(data);
CREATE INDEX idx_records_transformed ON records USING gin(transformed); CREATE INDEX idx_records_transformed ON records USING gin(transformed);
@ -139,28 +138,6 @@ COMMENT ON COLUMN import_log.info IS 'Import details: inserted_keys and excluded
CREATE INDEX idx_import_log_source ON import_log(source_name); CREATE INDEX idx_import_log_source ON import_log(source_name);
CREATE INDEX idx_import_log_timestamp ON import_log(imported_at); CREATE INDEX idx_import_log_timestamp ON import_log(imported_at);
------------------------------------------------------
-- Helper function: Generate dedup key
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_dedup_key(
data JSONB,
dedup_fields TEXT[]
) RETURNS TEXT AS $$
DECLARE
field TEXT;
values TEXT := '';
BEGIN
-- Concatenate values from dedup fields
FOREACH field IN ARRAY dedup_fields LOOP
values := values || COALESCE(data->>field, '') || '|';
END LOOP;
-- Return MD5 hash of concatenated values
RETURN md5(values);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
COMMENT ON FUNCTION generate_dedup_key IS 'Generate hash key from specified fields for deduplication';
------------------------------------------------------ ------------------------------------------------------
-- Summary -- Summary

View File

@ -42,7 +42,7 @@ curl -X POST http://localhost:3000/api/sources \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{ -d '{
"name": "bank_transactions", "name": "bank_transactions",
"dedup_fields": ["date", "description", "amount"] "constraint_fields": ["date", "description", "amount"]
}' }'
``` ```
@ -303,7 +303,7 @@ curl -X POST http://localhost:3000/api/records/search \
**Import fails:** **Import fails:**
- Verify source exists: `curl http://localhost:3000/api/sources` - Verify source exists: `curl http://localhost:3000/api/sources`
- Check CSV format matches expectations - Check CSV format matches expectations
- Ensure dedup_fields match CSV column names - Ensure constraint_fields match CSV column names
**Transformations not working:** **Transformations not working:**
- Check rules exist: `curl http://localhost:3000/api/rules/source/bank_transactions` - Check rules exist: `curl http://localhost:3000/api/rules/source/bank_transactions`

View File

@ -196,8 +196,24 @@ export default function Import({ source }) {
{error && <p className="text-sm text-red-500 mb-3">{error}</p>} {error && <p className="text-sm text-red-500 mb-3">{error}</p>}
{result && ( {result && (
<div className="bg-white border border-gray-200 rounded p-4 mb-4 text-sm"> <div className={`border rounded p-4 mb-4 text-sm ${result.success === false ? 'bg-red-50 border-red-200' : 'bg-white border-gray-200'}`}>
{result.imported !== undefined ? ( {result.success === false ? (
<>
<p className="text-red-600 font-medium mb-2">{result.error}</p>
{result.duplicate_rows && (
<div>
<p className="text-xs text-red-500 mb-1">Offending rows:</p>
<div className="max-h-48 overflow-y-auto bg-white rounded border border-red-100 p-2 font-mono text-xs text-red-700 space-y-0.5">
{result.duplicate_rows.map((row, i) => (
<div key={i}>
{Object.entries(row).map(([f, v]) => `${f}: ${v}`).join(' · ')}
</div>
))}
</div>
</div>
)}
</>
) : result.imported !== undefined ? (
<> <>
<span className="text-green-600 font-medium">{result.imported} imported</span> <span className="text-green-600 font-medium">{result.imported} imported</span>
<span className="text-gray-400 mx-2">·</span> <span className="text-gray-400 mx-2">·</span>

View File

@ -4,7 +4,7 @@ import { api } from '../api'
const FIELD_TYPES = ['text', 'numeric', 'date'] const FIELD_TYPES = ['text', 'numeric', 'date']
export default function Sources({ source, sources, setSources, setSource }) { export default function Sources({ source, sources, setSources, setSource }) {
const [dedup, setDedup] = useState('') const [constraintFields, setConstraintFields] = useState('')
const [schemaFields, setSchemaFields] = useState([]) const [schemaFields, setSchemaFields] = useState([])
const [stats, setStats] = useState(null) const [stats, setStats] = useState(null)
const [saving, setSaving] = useState(false) const [saving, setSaving] = useState(false)
@ -15,7 +15,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
const [viewName, setViewName] = useState('') const [viewName, setViewName] = useState('')
const [availableFields, setAvailableFields] = useState([]) const [availableFields, setAvailableFields] = useState([])
const [creating, setCreating] = useState(false) const [creating, setCreating] = useState(false)
const [form, setForm] = useState({ name: '', dedup_fields: '', fields: [], schema: [] }) const [form, setForm] = useState({ name: '', constraint_fields: '', fields: [], schema: [] })
const [createError, setCreateError] = useState('') const [createError, setCreateError] = useState('')
const [createLoading, setCreateLoading] = useState(false) const [createLoading, setCreateLoading] = useState(false)
const fileRef = useRef() const fileRef = useRef()
@ -24,7 +24,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
useEffect(() => { useEffect(() => {
if (!sourceObj) return if (!sourceObj) return
setDedup(sourceObj.dedup_fields?.join(', ') || '') setConstraintFields(sourceObj.constraint_fields?.join(', ') || '')
setSchemaFields((sourceObj.config?.fields || []).map((f, i) => ({ seq: i + 1, ...f }))) setSchemaFields((sourceObj.config?.fields || []).map((f, i) => ({ seq: i + 1, ...f })))
setViewName(sourceObj.config?.fields?.length ? `dfv.${sourceObj.name}` : '') setViewName(sourceObj.config?.fields?.length ? `dfv.${sourceObj.name}` : '')
setResult('') setResult('')
@ -40,10 +40,10 @@ export default function Sources({ source, sources, setSources, setSource }) {
setSaving(true) setSaving(true)
setError('') setError('')
try { try {
const dedup_fields = dedup.split(',').map(s => s.trim()).filter(Boolean) const constraint_fields = constraintFields.split(',').map(s => s.trim()).filter(Boolean)
const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0)) const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0))
const config = { ...(sourceObj.config || {}), fields } const config = { ...(sourceObj.config || {}), fields }
await api.updateSource(sourceObj.name, { dedup_fields, config }) await api.updateSource(sourceObj.name, { constraint_fields, config })
const updated = await api.getSources() const updated = await api.getSources()
setSources(updated) setSources(updated)
setResult('Saved.') setResult('Saved.')
@ -59,10 +59,10 @@ export default function Sources({ source, sources, setSources, setSource }) {
setResult('') setResult('')
setError('') setError('')
try { try {
const dedup_fields = dedup.split(',').map(s => s.trim()).filter(Boolean) const constraint_fields = constraintFields.split(',').map(s => s.trim()).filter(Boolean)
const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0)) const fields = [...schemaFields.filter(f => f.name)].sort((a, b) => (a.seq ?? 0) - (b.seq ?? 0))
const config = { ...(sourceObj.config || {}), fields } const config = { ...(sourceObj.config || {}), fields }
await api.updateSource(sourceObj.name, { dedup_fields, config }) await api.updateSource(sourceObj.name, { constraint_fields, config })
const res = await api.generateView(sourceObj.name) const res = await api.generateView(sourceObj.name)
if (res.success) { if (res.success) {
setViewName(res.view) setViewName(res.view)
@ -114,7 +114,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
setForm(f => ({ setForm(f => ({
...f, ...f,
fields: suggestion.fields, fields: suggestion.fields,
dedup_fields: '', constraint_fields: '',
schema: suggestion.fields.map(f => ({ name: f.name, type: f.type })) schema: suggestion.fields.map(f => ({ name: f.name, type: f.type }))
})) }))
} catch (err) { } catch (err) {
@ -125,19 +125,19 @@ export default function Sources({ source, sources, setSources, setSource }) {
async function handleCreate(e) { async function handleCreate(e) {
e.preventDefault() e.preventDefault()
setCreateError('') setCreateError('')
const dedupArr = form.dedup_fields.split(',').map(s => s.trim()).filter(Boolean) const constraintArr = form.constraint_fields.split(',').map(s => s.trim()).filter(Boolean)
if (!form.name || dedupArr.length === 0) { if (!form.name || constraintArr.length === 0) {
setCreateError('Name and at least one dedup field required') setCreateError('Name and at least one constraint field required')
return return
} }
setCreateLoading(true) setCreateLoading(true)
try { try {
const config = form.schema.length > 0 ? { fields: form.schema } : {} const config = form.schema.length > 0 ? { fields: form.schema } : {}
await api.createSource({ name: form.name, dedup_fields: dedupArr, config }) await api.createSource({ name: form.name, constraint_fields: constraintArr, config })
const updated = await api.getSources() const updated = await api.getSources()
setSources(updated) setSources(updated)
setSource(form.name) setSource(form.name)
setForm({ name: '', dedup_fields: '', fields: [], schema: [] }) setForm({ name: '', constraint_fields: '', fields: [], schema: [] })
setCreating(false) setCreating(false)
} catch (err) { } catch (err) {
setCreateError(err.message) setCreateError(err.message)
@ -186,7 +186,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
<th className="pb-1 font-medium">Key</th> <th className="pb-1 font-medium">Key</th>
<th className="pb-1 font-medium">Origin</th> <th className="pb-1 font-medium">Origin</th>
<th className="pb-1 font-medium">Type</th> <th className="pb-1 font-medium">Type</th>
<th className="pb-1 font-medium text-center">Dedup</th> <th className="pb-1 font-medium text-center">Constraint</th>
<th className="pb-1 font-medium text-center">In view</th> <th className="pb-1 font-medium text-center">In view</th>
<th className="pb-1 font-medium text-center">Seq</th> <th className="pb-1 font-medium text-center">Seq</th>
</tr> </tr>
@ -194,7 +194,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
<tbody> <tbody>
{availableFields.map(f => { {availableFields.map(f => {
const isRaw = f.origins.includes('raw') const isRaw = f.origins.includes('raw')
const dedupChecked = dedup.split(',').map(s => s.trim()).includes(f.key) const constraintChecked = constraintFields.split(',').map(s => s.trim()).includes(f.key)
const schemaEntry = schemaFields.find(sf => sf.name === f.key) const schemaEntry = schemaFields.find(sf => sf.name === f.key)
const inView = !!schemaEntry const inView = !!schemaEntry
return ( return (
@ -228,13 +228,13 @@ export default function Sources({ source, sources, setSources, setSource }) {
{isRaw && ( {isRaw && (
<input <input
type="checkbox" type="checkbox"
checked={dedupChecked} checked={constraintChecked}
onChange={e => { onChange={e => {
const current = dedup.split(',').map(s => s.trim()).filter(Boolean) const current = constraintFields.split(',').map(s => s.trim()).filter(Boolean)
const next = e.target.checked const next = e.target.checked
? [...current, f.key] ? [...current, f.key]
: current.filter(k => k !== f.key) : current.filter(k => k !== f.key)
setDedup(next.join(', ')) setConstraintFields(next.join(', '))
}} }}
/> />
)} )}
@ -354,13 +354,13 @@ export default function Sources({ source, sources, setSources, setSource }) {
{form.fields.length > 0 && ( {form.fields.length > 0 && (
<div> <div>
<label className="text-xs text-gray-500 block mb-1">Detected fields check to use as dedup keys</label> <label className="text-xs text-gray-500 block mb-1">Detected fields check to use as constraint fields</label>
<table className="w-full text-xs"> <table className="w-full text-xs">
<thead> <thead>
<tr className="text-left text-gray-400 border-b border-gray-100"> <tr className="text-left text-gray-400 border-b border-gray-100">
<th className="pb-1 font-medium">Field</th> <th className="pb-1 font-medium">Field</th>
<th className="pb-1 font-medium">Type</th> <th className="pb-1 font-medium">Type</th>
<th className="pb-1 font-medium text-center">Dedup</th> <th className="pb-1 font-medium text-center">Constraint</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
@ -371,13 +371,13 @@ export default function Sources({ source, sources, setSources, setSource }) {
<td className="py-1 text-center"> <td className="py-1 text-center">
<input <input
type="checkbox" type="checkbox"
checked={form.dedup_fields.split(',').map(s => s.trim()).includes(f.name)} checked={form.constraint_fields.split(',').map(s => s.trim()).includes(f.name)}
onChange={e => { onChange={e => {
const current = form.dedup_fields.split(',').map(s => s.trim()).filter(Boolean) const current = form.constraint_fields.split(',').map(s => s.trim()).filter(Boolean)
const next = e.target.checked const next = e.target.checked
? [...current, f.name] ? [...current, f.name]
: current.filter(n => n !== f.name) : current.filter(n => n !== f.name)
setForm(ff => ({ ...ff, dedup_fields: next.join(', ') })) setForm(ff => ({ ...ff, constraint_fields: next.join(', ') }))
}} }}
/> />
</td> </td>
@ -390,11 +390,11 @@ export default function Sources({ source, sources, setSources, setSource }) {
{form.fields.length === 0 && ( {form.fields.length === 0 && (
<div> <div>
<label className="text-xs text-gray-500 block mb-1">Dedup fields (comma-separated)</label> <label className="text-xs text-gray-500 block mb-1">Constraint fields (comma-separated)</label>
<input <input
className="w-full border border-gray-200 rounded px-3 py-1.5 text-sm focus:outline-none focus:border-blue-400" className="w-full border border-gray-200 rounded px-3 py-1.5 text-sm focus:outline-none focus:border-blue-400"
value={form.dedup_fields} value={form.constraint_fields}
onChange={e => setForm(f => ({ ...f, dedup_fields: e.target.value }))} onChange={e => setForm(f => ({ ...f, constraint_fields: e.target.value }))}
placeholder="e.g. date, amount, description" placeholder="e.g. date, amount, description"
/> />
</div> </div>
@ -408,7 +408,7 @@ export default function Sources({ source, sources, setSources, setSource }) {
{createLoading ? 'Creating…' : 'Create'} {createLoading ? 'Creating…' : 'Create'}
</button> </button>
<button type="button" <button type="button"
onClick={() => { setCreating(false); setCreateError(''); setForm({ name: '', dedup_fields: '', fields: [], schema: [] }) }} onClick={() => { setCreating(false); setCreateError(''); setForm({ name: '', constraint_fields: '', fields: [], schema: [] }) }}
className="text-sm text-gray-500 px-3 py-1.5 rounded hover:bg-gray-100"> className="text-sm text-gray-500 px-3 py-1.5 rounded hover:bg-gray-100">
Cancel Cancel
</button> </button>