Add TSV export/import backend and update unmapped sample column
- Restore export.tsv and import-csv endpoints to mappings routes - sample column is always last in export and discarded on import - get_unmapped_values now returns distinct source field values as sample instead of full raw records Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
6f2992b315
commit
3be5ccc435
@ -4,6 +4,12 @@
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
const multer = require('multer');
|
||||
const { parse } = require('csv-parse/sync');
|
||||
|
||||
const upload = multer({ storage: multer.memoryStorage() });
|
||||
|
||||
const SYSTEM_COLS = new Set(['source_name', 'rule_name', 'input_value', 'record_count', 'sample']);
|
||||
|
||||
module.exports = (pool) => {
|
||||
const router = express.Router();
|
||||
@ -46,6 +52,136 @@ module.exports = (pool) => {
|
||||
}
|
||||
});
|
||||
|
||||
// Export unmapped values + existing mappings as TSV
|
||||
// Columns: source_name, rule_name, input_value, record_count, <output keys...>, sample
|
||||
// sample is always last and is discarded on import
|
||||
router.get('/source/:source_name/export.tsv', async (req, res, next) => {
|
||||
try {
|
||||
const { rule_name } = req.query;
|
||||
const source_name = req.params.source_name;
|
||||
|
||||
const [unmappedResult, mappedResult] = await Promise.all([
|
||||
pool.query('SELECT * FROM get_unmapped_values($1, $2)', [source_name, rule_name || null]),
|
||||
pool.query(
|
||||
'SELECT * FROM mappings WHERE source_name = $1' + (rule_name ? ' AND rule_name = $2' : '') + ' ORDER BY rule_name, input_value',
|
||||
rule_name ? [source_name, rule_name] : [source_name]
|
||||
)
|
||||
]);
|
||||
|
||||
// Collect output keys from existing mappings
|
||||
const outputKeys = [];
|
||||
for (const row of mappedResult.rows) {
|
||||
for (const key of Object.keys(row.output || {})) {
|
||||
if (!outputKeys.includes(key)) outputKeys.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
const escape = (val) => String(val ?? '').replace(/\t/g, ' ');
|
||||
|
||||
// sample is always last
|
||||
const allCols = ['source_name', 'rule_name', 'input_value', 'record_count', ...outputKeys, 'sample'];
|
||||
|
||||
const dataRows = [];
|
||||
|
||||
for (const row of unmappedResult.rows) {
|
||||
const r = {
|
||||
source_name,
|
||||
rule_name: row.rule_name,
|
||||
input_value: Array.isArray(row.extracted_value) ? JSON.stringify(row.extracted_value) : String(row.extracted_value ?? ''),
|
||||
record_count: row.record_count,
|
||||
sample: Array.isArray(row.sample) ? row.sample.join(' | ') : String(row.sample ?? '')
|
||||
};
|
||||
for (const key of outputKeys) r[key] = '';
|
||||
dataRows.push(r);
|
||||
}
|
||||
|
||||
for (const row of mappedResult.rows) {
|
||||
const r = {
|
||||
source_name: row.source_name,
|
||||
rule_name: row.rule_name,
|
||||
input_value: Array.isArray(row.input_value) ? JSON.stringify(row.input_value) : String(row.input_value ?? ''),
|
||||
record_count: '',
|
||||
sample: ''
|
||||
};
|
||||
for (const key of outputKeys) r[key] = row.output?.[key] ?? '';
|
||||
dataRows.push(r);
|
||||
}
|
||||
|
||||
const tsv = [
|
||||
allCols.map(escape).join('\t'),
|
||||
...dataRows.map(r => allCols.map(c => escape(r[c])).join('\t'))
|
||||
].join('\n');
|
||||
|
||||
res.setHeader('Content-Type', 'text/tab-separated-values');
|
||||
res.setHeader('Content-Disposition', `attachment; filename="mappings_${source_name}.tsv"`);
|
||||
res.send(tsv);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Import mappings from uploaded TSV
|
||||
// Any column that isn't a system field (source_name, rule_name, input_value, record_count, sample)
|
||||
// is treated as an output key. sample is discarded wherever it appears.
|
||||
router.post('/source/:source_name/import-csv', upload.single('file'), async (req, res, next) => {
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
if (!req.file) {
|
||||
return res.status(400).json({ error: 'No file uploaded. Send TSV as multipart field named "file".' });
|
||||
}
|
||||
|
||||
const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true, delimiter: '\t' });
|
||||
|
||||
if (records.length === 0) {
|
||||
return res.status(400).json({ error: 'File is empty.' });
|
||||
}
|
||||
|
||||
const outputKeys = Object.keys(records[0]).filter(k => !SYSTEM_COLS.has(k));
|
||||
|
||||
const mappings = [];
|
||||
for (const row of records) {
|
||||
const { source_name, rule_name, input_value } = row;
|
||||
|
||||
const output = {};
|
||||
for (const key of outputKeys) {
|
||||
if (row[key] && row[key].trim() !== '') output[key] = row[key].trim();
|
||||
}
|
||||
if (Object.keys(output).length === 0) continue;
|
||||
|
||||
let parsedInput;
|
||||
try { parsedInput = JSON.parse(input_value); } catch { parsedInput = input_value; }
|
||||
|
||||
mappings.push({ source_name, rule_name, input_value: parsedInput, output });
|
||||
}
|
||||
|
||||
if (mappings.length === 0) {
|
||||
return res.status(400).json({ error: 'No rows with output values filled in.' });
|
||||
}
|
||||
|
||||
await client.query('BEGIN');
|
||||
const results = [];
|
||||
for (const { source_name, rule_name, input_value, output } of mappings) {
|
||||
const result = await client.query(
|
||||
`INSERT INTO mappings (source_name, rule_name, input_value, output)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (source_name, rule_name, input_value)
|
||||
DO UPDATE SET output = EXCLUDED.output
|
||||
RETURNING *`,
|
||||
[source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)]
|
||||
);
|
||||
results.push(result.rows[0]);
|
||||
}
|
||||
await client.query('COMMIT');
|
||||
|
||||
res.status(201).json({ count: results.length, mappings: results });
|
||||
} catch (err) {
|
||||
await client.query('ROLLBACK');
|
||||
next(err);
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
});
|
||||
|
||||
// Get single mapping
|
||||
router.get('/:id', async (req, res, next) => {
|
||||
try {
|
||||
|
||||
@ -190,9 +190,10 @@ CREATE FUNCTION get_unmapped_values(
|
||||
) RETURNS TABLE (
|
||||
rule_name TEXT,
|
||||
output_field TEXT,
|
||||
source_field TEXT,
|
||||
extracted_value JSONB,
|
||||
record_count BIGINT,
|
||||
sample_records JSONB
|
||||
sample JSONB
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
@ -200,8 +201,9 @@ BEGIN
|
||||
SELECT
|
||||
r.name AS rule_name,
|
||||
r.output_field,
|
||||
r.field AS source_field,
|
||||
rec.transformed->r.output_field AS extracted_value,
|
||||
rec.data AS raw_record
|
||||
rec.data->>r.field AS source_value
|
||||
FROM
|
||||
dataflow.records rec
|
||||
CROSS JOIN dataflow.rules r
|
||||
@ -211,26 +213,23 @@ BEGIN
|
||||
AND rec.transformed IS NOT NULL
|
||||
AND rec.transformed ? r.output_field
|
||||
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
||||
AND rec.data ? r.field
|
||||
)
|
||||
SELECT
|
||||
e.rule_name,
|
||||
e.output_field,
|
||||
e.source_field,
|
||||
e.extracted_value,
|
||||
count(*) AS record_count,
|
||||
jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records
|
||||
FROM (
|
||||
SELECT e2.rule_name, e2.output_field, e2.extracted_value, e2.raw_record,
|
||||
row_number() OVER (PARTITION BY e2.rule_name, e2.extracted_value ORDER BY (SELECT NULL)) AS rn
|
||||
FROM extracted e2
|
||||
) e
|
||||
jsonb_agg(DISTINCT e.source_value) FILTER (WHERE e.source_value IS NOT NULL) AS sample
|
||||
FROM extracted e
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM dataflow.mappings m
|
||||
WHERE m.source_name = p_source_name
|
||||
AND m.rule_name = e.rule_name
|
||||
AND m.input_value = e.extracted_value
|
||||
)
|
||||
AND e.rn <= 3
|
||||
GROUP BY e.rule_name, e.output_field, e.extracted_value
|
||||
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
||||
ORDER BY count(*) DESC;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user