diff --git a/api/routes/mappings.js b/api/routes/mappings.js index b01fc00..032b87d 100644 --- a/api/routes/mappings.js +++ b/api/routes/mappings.js @@ -4,6 +4,12 @@ */ const express = require('express'); +const multer = require('multer'); +const { parse } = require('csv-parse/sync'); + +const upload = multer({ storage: multer.memoryStorage() }); + +const SYSTEM_COLS = new Set(['source_name', 'rule_name', 'input_value', 'record_count', 'sample']); module.exports = (pool) => { const router = express.Router(); @@ -46,6 +52,136 @@ module.exports = (pool) => { } }); + // Export unmapped values + existing mappings as TSV + // Columns: source_name, rule_name, input_value, record_count, , sample + // sample is always last and is discarded on import + router.get('/source/:source_name/export.tsv', async (req, res, next) => { + try { + const { rule_name } = req.query; + const source_name = req.params.source_name; + + const [unmappedResult, mappedResult] = await Promise.all([ + pool.query('SELECT * FROM get_unmapped_values($1, $2)', [source_name, rule_name || null]), + pool.query( + 'SELECT * FROM mappings WHERE source_name = $1' + (rule_name ? ' AND rule_name = $2' : '') + ' ORDER BY rule_name, input_value', + rule_name ? [source_name, rule_name] : [source_name] + ) + ]); + + // Collect output keys from existing mappings + const outputKeys = []; + for (const row of mappedResult.rows) { + for (const key of Object.keys(row.output || {})) { + if (!outputKeys.includes(key)) outputKeys.push(key); + } + } + + const escape = (val) => String(val ?? '').replace(/\t/g, ' '); + + // sample is always last + const allCols = ['source_name', 'rule_name', 'input_value', 'record_count', ...outputKeys, 'sample']; + + const dataRows = []; + + for (const row of unmappedResult.rows) { + const r = { + source_name, + rule_name: row.rule_name, + input_value: Array.isArray(row.extracted_value) ? JSON.stringify(row.extracted_value) : String(row.extracted_value ?? ''), + record_count: row.record_count, + sample: Array.isArray(row.sample) ? row.sample.join(' | ') : String(row.sample ?? '') + }; + for (const key of outputKeys) r[key] = ''; + dataRows.push(r); + } + + for (const row of mappedResult.rows) { + const r = { + source_name: row.source_name, + rule_name: row.rule_name, + input_value: Array.isArray(row.input_value) ? JSON.stringify(row.input_value) : String(row.input_value ?? ''), + record_count: '', + sample: '' + }; + for (const key of outputKeys) r[key] = row.output?.[key] ?? ''; + dataRows.push(r); + } + + const tsv = [ + allCols.map(escape).join('\t'), + ...dataRows.map(r => allCols.map(c => escape(r[c])).join('\t')) + ].join('\n'); + + res.setHeader('Content-Type', 'text/tab-separated-values'); + res.setHeader('Content-Disposition', `attachment; filename="mappings_${source_name}.tsv"`); + res.send(tsv); + } catch (err) { + next(err); + } + }); + + // Import mappings from uploaded TSV + // Any column that isn't a system field (source_name, rule_name, input_value, record_count, sample) + // is treated as an output key. sample is discarded wherever it appears. + router.post('/source/:source_name/import-csv', upload.single('file'), async (req, res, next) => { + const client = await pool.connect(); + try { + if (!req.file) { + return res.status(400).json({ error: 'No file uploaded. Send TSV as multipart field named "file".' }); + } + + const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true, delimiter: '\t' }); + + if (records.length === 0) { + return res.status(400).json({ error: 'File is empty.' }); + } + + const outputKeys = Object.keys(records[0]).filter(k => !SYSTEM_COLS.has(k)); + + const mappings = []; + for (const row of records) { + const { source_name, rule_name, input_value } = row; + + const output = {}; + for (const key of outputKeys) { + if (row[key] && row[key].trim() !== '') output[key] = row[key].trim(); + } + if (Object.keys(output).length === 0) continue; + + let parsedInput; + try { parsedInput = JSON.parse(input_value); } catch { parsedInput = input_value; } + + mappings.push({ source_name, rule_name, input_value: parsedInput, output }); + } + + if (mappings.length === 0) { + return res.status(400).json({ error: 'No rows with output values filled in.' }); + } + + await client.query('BEGIN'); + const results = []; + for (const { source_name, rule_name, input_value, output } of mappings) { + const result = await client.query( + `INSERT INTO mappings (source_name, rule_name, input_value, output) + VALUES ($1, $2, $3, $4) + ON CONFLICT (source_name, rule_name, input_value) + DO UPDATE SET output = EXCLUDED.output + RETURNING *`, + [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] + ); + results.push(result.rows[0]); + } + await client.query('COMMIT'); + + res.status(201).json({ count: results.length, mappings: results }); + } catch (err) { + await client.query('ROLLBACK'); + next(err); + } finally { + client.release(); + } + }); + // Get single mapping router.get('/:id', async (req, res, next) => { try { diff --git a/api/routes/rules.js b/api/routes/rules.js index 3268ae2..8dffd4f 100644 --- a/api/routes/rules.js +++ b/api/routes/rules.js @@ -145,7 +145,7 @@ module.exports = (pool) => { // Create rule router.post('/', async (req, res, next) => { try { - const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body; + const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body; if (!source_name || !name || !field || !pattern || !output_field) { return res.status(400).json({ @@ -158,10 +158,10 @@ module.exports = (pool) => { } const result = await pool.query( - `INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + `INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *`, - [source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, sequence || 0] + [source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, retain === true, sequence || 0] ); res.status(201).json(result.rows[0]); @@ -179,7 +179,7 @@ module.exports = (pool) => { // Update rule router.put('/:id', async (req, res, next) => { try { - const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body; + const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body; if (function_type && !['extract', 'replace'].includes(function_type)) { return res.status(400).json({ error: 'function_type must be "extract" or "replace"' }); @@ -195,10 +195,11 @@ module.exports = (pool) => { flags = COALESCE($7, flags), replace_value = COALESCE($8, replace_value), enabled = COALESCE($9, enabled), - sequence = COALESCE($10, sequence) + retain = COALESCE($10, retain), + sequence = COALESCE($11, sequence) WHERE id = $1 RETURNING *`, - [req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence] + [req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence] ); if (result.rows.length === 0) { diff --git a/database/functions.sql b/database/functions.sql index 027adc8..3f3379d 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -149,6 +149,10 @@ BEGIN IF v_mapping IS NOT NULL THEN -- Apply mapping (merge mapped fields into result) v_transformed := v_transformed || v_mapping; + -- If retain is set, also write the extracted value to output_field + IF v_rule.retain THEN + v_transformed := jsonb_set(v_transformed, ARRAY[v_rule.output_field], v_extracted); + END IF; ELSE -- No mapping, store extracted value (scalar or array) v_transformed := jsonb_set( @@ -190,9 +194,10 @@ CREATE FUNCTION get_unmapped_values( ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, + source_field TEXT, extracted_value JSONB, record_count BIGINT, - sample_records JSONB + sample JSONB ) AS $$ BEGIN RETURN QUERY @@ -200,8 +205,9 @@ BEGIN SELECT r.name AS rule_name, r.output_field, + r.field AS source_field, rec.transformed->r.output_field AS extracted_value, - rec.data AS raw_record + rec.data->>r.field AS source_value FROM dataflow.records rec CROSS JOIN dataflow.rules r @@ -211,26 +217,23 @@ BEGIN AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) + AND rec.data ? r.field ) SELECT e.rule_name, e.output_field, + e.source_field, e.extracted_value, count(*) AS record_count, - jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records - FROM ( - SELECT e2.rule_name, e2.output_field, e2.extracted_value, e2.raw_record, - row_number() OVER (PARTITION BY e2.rule_name, e2.extracted_value ORDER BY (SELECT NULL)) AS rn - FROM extracted e2 - ) e + jsonb_agg(DISTINCT e.source_value) FILTER (WHERE e.source_value IS NOT NULL) AS sample + FROM extracted e WHERE NOT EXISTS ( SELECT 1 FROM dataflow.mappings m WHERE m.source_name = p_source_name AND m.rule_name = e.rule_name AND m.input_value = e.extracted_value ) - AND e.rn <= 3 - GROUP BY e.rule_name, e.output_field, e.extracted_value + GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value ORDER BY count(*) DESC; END; $$ LANGUAGE plpgsql; diff --git a/database/migrate_tps.sql b/database/migrate_tps.sql index aa65e0f..2108e95 100644 --- a/database/migrate_tps.sql +++ b/database/migrate_tps.sql @@ -51,7 +51,7 @@ FROM dataflow.sources ORDER BY name; \echo '=== 2. Rules ===' INSERT INTO dataflow.rules - (source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled) + (source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled, retain) SELECT srce AS source_name, target AS name, @@ -63,7 +63,8 @@ SELECT COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags, '' AS replace_value, seq AS sequence, - true AS enabled + true AS enabled, + (regex->'regex'->'defn'->0->>'retain') = 'y' AS retain FROM dblink(:'tps_conn', 'SELECT srce, target, seq, regex FROM tps.map_rm' ) AS t(srce TEXT, target TEXT, seq INT, regex JSONB) diff --git a/database/schema.sql b/database/schema.sql index 68daff9..adca799 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -76,6 +76,7 @@ CREATE TABLE rules ( -- Options enabled BOOLEAN DEFAULT true, + retain BOOLEAN DEFAULT false, -- Write output_field even when a mapping is applied sequence INTEGER DEFAULT 0, -- Execution order -- Metadata diff --git a/ui/src/pages/Rules.jsx b/ui/src/pages/Rules.jsx index a6012d1..b9359a2 100644 --- a/ui/src/pages/Rules.jsx +++ b/ui/src/pages/Rules.jsx @@ -1,7 +1,7 @@ import { useState, useEffect, useRef } from 'react' import { api } from '../api' -const EMPTY_FORM = { name: '', field: '', pattern: '', output_field: '', function_type: 'extract', flags: '', replace_value: '', sequence: 0 } +const EMPTY_FORM = { name: '', field: '', pattern: '', output_field: '', function_type: 'extract', flags: '', replace_value: '', retain: false, sequence: 0 } function PreviewModal({ rows, onClose }) { const matched = rows.filter(r => r.extracted_value != null).length @@ -144,6 +144,16 @@ function FormPanel({ form, setForm, editing, error, loading, fields, source, onS /> + {form.function_type === 'extract' && ( + + )} {form.function_type === 'replace' && (
@@ -237,6 +247,7 @@ export default function Rules({ source }) { function_type: rule.function_type || 'extract', flags: rule.flags || '', replace_value: rule.replace_value || '', + retain: rule.retain || false, sequence: rule.sequence, }) setEditing(rule.id)