Merge sample-refactor: TSV mappings export/import, retain flag, regex improvements
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
commit
f7f88bb5cf
@ -4,6 +4,12 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
const express = require('express');
|
const express = require('express');
|
||||||
|
const multer = require('multer');
|
||||||
|
const { parse } = require('csv-parse/sync');
|
||||||
|
|
||||||
|
const upload = multer({ storage: multer.memoryStorage() });
|
||||||
|
|
||||||
|
const SYSTEM_COLS = new Set(['source_name', 'rule_name', 'input_value', 'record_count', 'sample']);
|
||||||
|
|
||||||
module.exports = (pool) => {
|
module.exports = (pool) => {
|
||||||
const router = express.Router();
|
const router = express.Router();
|
||||||
@ -46,6 +52,136 @@ module.exports = (pool) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Export unmapped values + existing mappings as TSV
|
||||||
|
// Columns: source_name, rule_name, input_value, record_count, <output keys...>, sample
|
||||||
|
// sample is always last and is discarded on import
|
||||||
|
router.get('/source/:source_name/export.tsv', async (req, res, next) => {
|
||||||
|
try {
|
||||||
|
const { rule_name } = req.query;
|
||||||
|
const source_name = req.params.source_name;
|
||||||
|
|
||||||
|
const [unmappedResult, mappedResult] = await Promise.all([
|
||||||
|
pool.query('SELECT * FROM get_unmapped_values($1, $2)', [source_name, rule_name || null]),
|
||||||
|
pool.query(
|
||||||
|
'SELECT * FROM mappings WHERE source_name = $1' + (rule_name ? ' AND rule_name = $2' : '') + ' ORDER BY rule_name, input_value',
|
||||||
|
rule_name ? [source_name, rule_name] : [source_name]
|
||||||
|
)
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Collect output keys from existing mappings
|
||||||
|
const outputKeys = [];
|
||||||
|
for (const row of mappedResult.rows) {
|
||||||
|
for (const key of Object.keys(row.output || {})) {
|
||||||
|
if (!outputKeys.includes(key)) outputKeys.push(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const escape = (val) => String(val ?? '').replace(/\t/g, ' ');
|
||||||
|
|
||||||
|
// sample is always last
|
||||||
|
const allCols = ['source_name', 'rule_name', 'input_value', 'record_count', ...outputKeys, 'sample'];
|
||||||
|
|
||||||
|
const dataRows = [];
|
||||||
|
|
||||||
|
for (const row of unmappedResult.rows) {
|
||||||
|
const r = {
|
||||||
|
source_name,
|
||||||
|
rule_name: row.rule_name,
|
||||||
|
input_value: Array.isArray(row.extracted_value) ? JSON.stringify(row.extracted_value) : String(row.extracted_value ?? ''),
|
||||||
|
record_count: row.record_count,
|
||||||
|
sample: Array.isArray(row.sample) ? row.sample.join(' | ') : String(row.sample ?? '')
|
||||||
|
};
|
||||||
|
for (const key of outputKeys) r[key] = '';
|
||||||
|
dataRows.push(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const row of mappedResult.rows) {
|
||||||
|
const r = {
|
||||||
|
source_name: row.source_name,
|
||||||
|
rule_name: row.rule_name,
|
||||||
|
input_value: Array.isArray(row.input_value) ? JSON.stringify(row.input_value) : String(row.input_value ?? ''),
|
||||||
|
record_count: '',
|
||||||
|
sample: ''
|
||||||
|
};
|
||||||
|
for (const key of outputKeys) r[key] = row.output?.[key] ?? '';
|
||||||
|
dataRows.push(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
const tsv = [
|
||||||
|
allCols.map(escape).join('\t'),
|
||||||
|
...dataRows.map(r => allCols.map(c => escape(r[c])).join('\t'))
|
||||||
|
].join('\n');
|
||||||
|
|
||||||
|
res.setHeader('Content-Type', 'text/tab-separated-values');
|
||||||
|
res.setHeader('Content-Disposition', `attachment; filename="mappings_${source_name}.tsv"`);
|
||||||
|
res.send(tsv);
|
||||||
|
} catch (err) {
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Import mappings from uploaded TSV
|
||||||
|
// Any column that isn't a system field (source_name, rule_name, input_value, record_count, sample)
|
||||||
|
// is treated as an output key. sample is discarded wherever it appears.
|
||||||
|
router.post('/source/:source_name/import-csv', upload.single('file'), async (req, res, next) => {
|
||||||
|
const client = await pool.connect();
|
||||||
|
try {
|
||||||
|
if (!req.file) {
|
||||||
|
return res.status(400).json({ error: 'No file uploaded. Send TSV as multipart field named "file".' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true, delimiter: '\t' });
|
||||||
|
|
||||||
|
if (records.length === 0) {
|
||||||
|
return res.status(400).json({ error: 'File is empty.' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const outputKeys = Object.keys(records[0]).filter(k => !SYSTEM_COLS.has(k));
|
||||||
|
|
||||||
|
const mappings = [];
|
||||||
|
for (const row of records) {
|
||||||
|
const { source_name, rule_name, input_value } = row;
|
||||||
|
|
||||||
|
const output = {};
|
||||||
|
for (const key of outputKeys) {
|
||||||
|
if (row[key] && row[key].trim() !== '') output[key] = row[key].trim();
|
||||||
|
}
|
||||||
|
if (Object.keys(output).length === 0) continue;
|
||||||
|
|
||||||
|
let parsedInput;
|
||||||
|
try { parsedInput = JSON.parse(input_value); } catch { parsedInput = input_value; }
|
||||||
|
|
||||||
|
mappings.push({ source_name, rule_name, input_value: parsedInput, output });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mappings.length === 0) {
|
||||||
|
return res.status(400).json({ error: 'No rows with output values filled in.' });
|
||||||
|
}
|
||||||
|
|
||||||
|
await client.query('BEGIN');
|
||||||
|
const results = [];
|
||||||
|
for (const { source_name, rule_name, input_value, output } of mappings) {
|
||||||
|
const result = await client.query(
|
||||||
|
`INSERT INTO mappings (source_name, rule_name, input_value, output)
|
||||||
|
VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (source_name, rule_name, input_value)
|
||||||
|
DO UPDATE SET output = EXCLUDED.output
|
||||||
|
RETURNING *`,
|
||||||
|
[source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)]
|
||||||
|
);
|
||||||
|
results.push(result.rows[0]);
|
||||||
|
}
|
||||||
|
await client.query('COMMIT');
|
||||||
|
|
||||||
|
res.status(201).json({ count: results.length, mappings: results });
|
||||||
|
} catch (err) {
|
||||||
|
await client.query('ROLLBACK');
|
||||||
|
next(err);
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Get single mapping
|
// Get single mapping
|
||||||
router.get('/:id', async (req, res, next) => {
|
router.get('/:id', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -145,7 +145,7 @@ module.exports = (pool) => {
|
|||||||
// Create rule
|
// Create rule
|
||||||
router.post('/', async (req, res, next) => {
|
router.post('/', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body;
|
const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body;
|
||||||
|
|
||||||
if (!source_name || !name || !field || !pattern || !output_field) {
|
if (!source_name || !name || !field || !pattern || !output_field) {
|
||||||
return res.status(400).json({
|
return res.status(400).json({
|
||||||
@ -158,10 +158,10 @@ module.exports = (pool) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const result = await pool.query(
|
const result = await pool.query(
|
||||||
`INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence)
|
`INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||||
RETURNING *`,
|
RETURNING *`,
|
||||||
[source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, sequence || 0]
|
[source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, retain === true, sequence || 0]
|
||||||
);
|
);
|
||||||
|
|
||||||
res.status(201).json(result.rows[0]);
|
res.status(201).json(result.rows[0]);
|
||||||
@ -179,7 +179,7 @@ module.exports = (pool) => {
|
|||||||
// Update rule
|
// Update rule
|
||||||
router.put('/:id', async (req, res, next) => {
|
router.put('/:id', async (req, res, next) => {
|
||||||
try {
|
try {
|
||||||
const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence } = req.body;
|
const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body;
|
||||||
|
|
||||||
if (function_type && !['extract', 'replace'].includes(function_type)) {
|
if (function_type && !['extract', 'replace'].includes(function_type)) {
|
||||||
return res.status(400).json({ error: 'function_type must be "extract" or "replace"' });
|
return res.status(400).json({ error: 'function_type must be "extract" or "replace"' });
|
||||||
@ -195,10 +195,11 @@ module.exports = (pool) => {
|
|||||||
flags = COALESCE($7, flags),
|
flags = COALESCE($7, flags),
|
||||||
replace_value = COALESCE($8, replace_value),
|
replace_value = COALESCE($8, replace_value),
|
||||||
enabled = COALESCE($9, enabled),
|
enabled = COALESCE($9, enabled),
|
||||||
sequence = COALESCE($10, sequence)
|
retain = COALESCE($10, retain),
|
||||||
|
sequence = COALESCE($11, sequence)
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
RETURNING *`,
|
RETURNING *`,
|
||||||
[req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, sequence]
|
[req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence]
|
||||||
);
|
);
|
||||||
|
|
||||||
if (result.rows.length === 0) {
|
if (result.rows.length === 0) {
|
||||||
|
|||||||
@ -149,6 +149,10 @@ BEGIN
|
|||||||
IF v_mapping IS NOT NULL THEN
|
IF v_mapping IS NOT NULL THEN
|
||||||
-- Apply mapping (merge mapped fields into result)
|
-- Apply mapping (merge mapped fields into result)
|
||||||
v_transformed := v_transformed || v_mapping;
|
v_transformed := v_transformed || v_mapping;
|
||||||
|
-- If retain is set, also write the extracted value to output_field
|
||||||
|
IF v_rule.retain THEN
|
||||||
|
v_transformed := jsonb_set(v_transformed, ARRAY[v_rule.output_field], v_extracted);
|
||||||
|
END IF;
|
||||||
ELSE
|
ELSE
|
||||||
-- No mapping, store extracted value (scalar or array)
|
-- No mapping, store extracted value (scalar or array)
|
||||||
v_transformed := jsonb_set(
|
v_transformed := jsonb_set(
|
||||||
@ -190,9 +194,10 @@ CREATE FUNCTION get_unmapped_values(
|
|||||||
) RETURNS TABLE (
|
) RETURNS TABLE (
|
||||||
rule_name TEXT,
|
rule_name TEXT,
|
||||||
output_field TEXT,
|
output_field TEXT,
|
||||||
|
source_field TEXT,
|
||||||
extracted_value JSONB,
|
extracted_value JSONB,
|
||||||
record_count BIGINT,
|
record_count BIGINT,
|
||||||
sample_records JSONB
|
sample JSONB
|
||||||
) AS $$
|
) AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
RETURN QUERY
|
RETURN QUERY
|
||||||
@ -200,8 +205,9 @@ BEGIN
|
|||||||
SELECT
|
SELECT
|
||||||
r.name AS rule_name,
|
r.name AS rule_name,
|
||||||
r.output_field,
|
r.output_field,
|
||||||
|
r.field AS source_field,
|
||||||
rec.transformed->r.output_field AS extracted_value,
|
rec.transformed->r.output_field AS extracted_value,
|
||||||
rec.data AS raw_record
|
rec.data->>r.field AS source_value
|
||||||
FROM
|
FROM
|
||||||
dataflow.records rec
|
dataflow.records rec
|
||||||
CROSS JOIN dataflow.rules r
|
CROSS JOIN dataflow.rules r
|
||||||
@ -211,26 +217,23 @@ BEGIN
|
|||||||
AND rec.transformed IS NOT NULL
|
AND rec.transformed IS NOT NULL
|
||||||
AND rec.transformed ? r.output_field
|
AND rec.transformed ? r.output_field
|
||||||
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
||||||
|
AND rec.data ? r.field
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
e.rule_name,
|
e.rule_name,
|
||||||
e.output_field,
|
e.output_field,
|
||||||
|
e.source_field,
|
||||||
e.extracted_value,
|
e.extracted_value,
|
||||||
count(*) AS record_count,
|
count(*) AS record_count,
|
||||||
jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records
|
jsonb_agg(DISTINCT e.source_value) FILTER (WHERE e.source_value IS NOT NULL) AS sample
|
||||||
FROM (
|
FROM extracted e
|
||||||
SELECT e2.rule_name, e2.output_field, e2.extracted_value, e2.raw_record,
|
|
||||||
row_number() OVER (PARTITION BY e2.rule_name, e2.extracted_value ORDER BY (SELECT NULL)) AS rn
|
|
||||||
FROM extracted e2
|
|
||||||
) e
|
|
||||||
WHERE NOT EXISTS (
|
WHERE NOT EXISTS (
|
||||||
SELECT 1 FROM dataflow.mappings m
|
SELECT 1 FROM dataflow.mappings m
|
||||||
WHERE m.source_name = p_source_name
|
WHERE m.source_name = p_source_name
|
||||||
AND m.rule_name = e.rule_name
|
AND m.rule_name = e.rule_name
|
||||||
AND m.input_value = e.extracted_value
|
AND m.input_value = e.extracted_value
|
||||||
)
|
)
|
||||||
AND e.rn <= 3
|
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
||||||
GROUP BY e.rule_name, e.output_field, e.extracted_value
|
|
||||||
ORDER BY count(*) DESC;
|
ORDER BY count(*) DESC;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|||||||
@ -51,7 +51,7 @@ FROM dataflow.sources ORDER BY name;
|
|||||||
\echo '=== 2. Rules ==='
|
\echo '=== 2. Rules ==='
|
||||||
|
|
||||||
INSERT INTO dataflow.rules
|
INSERT INTO dataflow.rules
|
||||||
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled)
|
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled, retain)
|
||||||
SELECT
|
SELECT
|
||||||
srce AS source_name,
|
srce AS source_name,
|
||||||
target AS name,
|
target AS name,
|
||||||
@ -63,7 +63,8 @@ SELECT
|
|||||||
COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags,
|
COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags,
|
||||||
'' AS replace_value,
|
'' AS replace_value,
|
||||||
seq AS sequence,
|
seq AS sequence,
|
||||||
true AS enabled
|
true AS enabled,
|
||||||
|
(regex->'regex'->'defn'->0->>'retain') = 'y' AS retain
|
||||||
FROM dblink(:'tps_conn',
|
FROM dblink(:'tps_conn',
|
||||||
'SELECT srce, target, seq, regex FROM tps.map_rm'
|
'SELECT srce, target, seq, regex FROM tps.map_rm'
|
||||||
) AS t(srce TEXT, target TEXT, seq INT, regex JSONB)
|
) AS t(srce TEXT, target TEXT, seq INT, regex JSONB)
|
||||||
|
|||||||
@ -76,6 +76,7 @@ CREATE TABLE rules (
|
|||||||
|
|
||||||
-- Options
|
-- Options
|
||||||
enabled BOOLEAN DEFAULT true,
|
enabled BOOLEAN DEFAULT true,
|
||||||
|
retain BOOLEAN DEFAULT false, -- Write output_field even when a mapping is applied
|
||||||
sequence INTEGER DEFAULT 0, -- Execution order
|
sequence INTEGER DEFAULT 0, -- Execution order
|
||||||
|
|
||||||
-- Metadata
|
-- Metadata
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import { useState, useEffect, useRef } from 'react'
|
import { useState, useEffect, useRef } from 'react'
|
||||||
import { api } from '../api'
|
import { api } from '../api'
|
||||||
|
|
||||||
const EMPTY_FORM = { name: '', field: '', pattern: '', output_field: '', function_type: 'extract', flags: '', replace_value: '', sequence: 0 }
|
const EMPTY_FORM = { name: '', field: '', pattern: '', output_field: '', function_type: 'extract', flags: '', replace_value: '', retain: false, sequence: 0 }
|
||||||
|
|
||||||
function PreviewModal({ rows, onClose }) {
|
function PreviewModal({ rows, onClose }) {
|
||||||
const matched = rows.filter(r => r.extracted_value != null).length
|
const matched = rows.filter(r => r.extracted_value != null).length
|
||||||
@ -144,6 +144,16 @@ function FormPanel({ form, setForm, editing, error, loading, fields, source, onS
|
|||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
{form.function_type === 'extract' && (
|
||||||
|
<label className="flex items-center gap-2 text-xs text-gray-600 cursor-pointer select-none">
|
||||||
|
<input
|
||||||
|
type="checkbox"
|
||||||
|
checked={!!form.retain}
|
||||||
|
onChange={e => setForm(f => ({ ...f, retain: e.target.checked }))}
|
||||||
|
/>
|
||||||
|
Retain extracted value in output field even when a mapping is applied
|
||||||
|
</label>
|
||||||
|
)}
|
||||||
{form.function_type === 'replace' && (
|
{form.function_type === 'replace' && (
|
||||||
<div>
|
<div>
|
||||||
<label className="text-xs text-gray-500 block mb-1">Replacement string</label>
|
<label className="text-xs text-gray-500 block mb-1">Replacement string</label>
|
||||||
@ -237,6 +247,7 @@ export default function Rules({ source }) {
|
|||||||
function_type: rule.function_type || 'extract',
|
function_type: rule.function_type || 'extract',
|
||||||
flags: rule.flags || '',
|
flags: rule.flags || '',
|
||||||
replace_value: rule.replace_value || '',
|
replace_value: rule.replace_value || '',
|
||||||
|
retain: rule.retain || false,
|
||||||
sequence: rule.sequence,
|
sequence: rule.sequence,
|
||||||
})
|
})
|
||||||
setEditing(rule.id)
|
setEditing(rule.id)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user