diff --git a/api/routes/mappings.js b/api/routes/mappings.js index b01fc00..032b87d 100644 --- a/api/routes/mappings.js +++ b/api/routes/mappings.js @@ -4,6 +4,12 @@ */ const express = require('express'); +const multer = require('multer'); +const { parse } = require('csv-parse/sync'); + +const upload = multer({ storage: multer.memoryStorage() }); + +const SYSTEM_COLS = new Set(['source_name', 'rule_name', 'input_value', 'record_count', 'sample']); module.exports = (pool) => { const router = express.Router(); @@ -46,6 +52,136 @@ module.exports = (pool) => { } }); + // Export unmapped values + existing mappings as TSV + // Columns: source_name, rule_name, input_value, record_count, , sample + // sample is always last and is discarded on import + router.get('/source/:source_name/export.tsv', async (req, res, next) => { + try { + const { rule_name } = req.query; + const source_name = req.params.source_name; + + const [unmappedResult, mappedResult] = await Promise.all([ + pool.query('SELECT * FROM get_unmapped_values($1, $2)', [source_name, rule_name || null]), + pool.query( + 'SELECT * FROM mappings WHERE source_name = $1' + (rule_name ? ' AND rule_name = $2' : '') + ' ORDER BY rule_name, input_value', + rule_name ? [source_name, rule_name] : [source_name] + ) + ]); + + // Collect output keys from existing mappings + const outputKeys = []; + for (const row of mappedResult.rows) { + for (const key of Object.keys(row.output || {})) { + if (!outputKeys.includes(key)) outputKeys.push(key); + } + } + + const escape = (val) => String(val ?? '').replace(/\t/g, ' '); + + // sample is always last + const allCols = ['source_name', 'rule_name', 'input_value', 'record_count', ...outputKeys, 'sample']; + + const dataRows = []; + + for (const row of unmappedResult.rows) { + const r = { + source_name, + rule_name: row.rule_name, + input_value: Array.isArray(row.extracted_value) ? JSON.stringify(row.extracted_value) : String(row.extracted_value ?? ''), + record_count: row.record_count, + sample: Array.isArray(row.sample) ? row.sample.join(' | ') : String(row.sample ?? '') + }; + for (const key of outputKeys) r[key] = ''; + dataRows.push(r); + } + + for (const row of mappedResult.rows) { + const r = { + source_name: row.source_name, + rule_name: row.rule_name, + input_value: Array.isArray(row.input_value) ? JSON.stringify(row.input_value) : String(row.input_value ?? ''), + record_count: '', + sample: '' + }; + for (const key of outputKeys) r[key] = row.output?.[key] ?? ''; + dataRows.push(r); + } + + const tsv = [ + allCols.map(escape).join('\t'), + ...dataRows.map(r => allCols.map(c => escape(r[c])).join('\t')) + ].join('\n'); + + res.setHeader('Content-Type', 'text/tab-separated-values'); + res.setHeader('Content-Disposition', `attachment; filename="mappings_${source_name}.tsv"`); + res.send(tsv); + } catch (err) { + next(err); + } + }); + + // Import mappings from uploaded TSV + // Any column that isn't a system field (source_name, rule_name, input_value, record_count, sample) + // is treated as an output key. sample is discarded wherever it appears. + router.post('/source/:source_name/import-csv', upload.single('file'), async (req, res, next) => { + const client = await pool.connect(); + try { + if (!req.file) { + return res.status(400).json({ error: 'No file uploaded. Send TSV as multipart field named "file".' }); + } + + const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true, delimiter: '\t' }); + + if (records.length === 0) { + return res.status(400).json({ error: 'File is empty.' }); + } + + const outputKeys = Object.keys(records[0]).filter(k => !SYSTEM_COLS.has(k)); + + const mappings = []; + for (const row of records) { + const { source_name, rule_name, input_value } = row; + + const output = {}; + for (const key of outputKeys) { + if (row[key] && row[key].trim() !== '') output[key] = row[key].trim(); + } + if (Object.keys(output).length === 0) continue; + + let parsedInput; + try { parsedInput = JSON.parse(input_value); } catch { parsedInput = input_value; } + + mappings.push({ source_name, rule_name, input_value: parsedInput, output }); + } + + if (mappings.length === 0) { + return res.status(400).json({ error: 'No rows with output values filled in.' }); + } + + await client.query('BEGIN'); + const results = []; + for (const { source_name, rule_name, input_value, output } of mappings) { + const result = await client.query( + `INSERT INTO mappings (source_name, rule_name, input_value, output) + VALUES ($1, $2, $3, $4) + ON CONFLICT (source_name, rule_name, input_value) + DO UPDATE SET output = EXCLUDED.output + RETURNING *`, + [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] + ); + results.push(result.rows[0]); + } + await client.query('COMMIT'); + + res.status(201).json({ count: results.length, mappings: results }); + } catch (err) { + await client.query('ROLLBACK'); + next(err); + } finally { + client.release(); + } + }); + // Get single mapping router.get('/:id', async (req, res, next) => { try { diff --git a/database/functions.sql b/database/functions.sql index 027adc8..58e35f5 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -190,9 +190,10 @@ CREATE FUNCTION get_unmapped_values( ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, + source_field TEXT, extracted_value JSONB, record_count BIGINT, - sample_records JSONB + sample JSONB ) AS $$ BEGIN RETURN QUERY @@ -200,8 +201,9 @@ BEGIN SELECT r.name AS rule_name, r.output_field, + r.field AS source_field, rec.transformed->r.output_field AS extracted_value, - rec.data AS raw_record + rec.data->>r.field AS source_value FROM dataflow.records rec CROSS JOIN dataflow.rules r @@ -211,26 +213,23 @@ BEGIN AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) + AND rec.data ? r.field ) SELECT e.rule_name, e.output_field, + e.source_field, e.extracted_value, count(*) AS record_count, - jsonb_agg(e.raw_record ORDER BY e.raw_record) FILTER (WHERE e.raw_record IS NOT NULL) AS sample_records - FROM ( - SELECT e2.rule_name, e2.output_field, e2.extracted_value, e2.raw_record, - row_number() OVER (PARTITION BY e2.rule_name, e2.extracted_value ORDER BY (SELECT NULL)) AS rn - FROM extracted e2 - ) e + jsonb_agg(DISTINCT e.source_value) FILTER (WHERE e.source_value IS NOT NULL) AS sample + FROM extracted e WHERE NOT EXISTS ( SELECT 1 FROM dataflow.mappings m WHERE m.source_name = p_source_name AND m.rule_name = e.rule_name AND m.input_value = e.extracted_value ) - AND e.rn <= 3 - GROUP BY e.rule_name, e.output_field, e.extracted_value + GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value ORDER BY count(*) DESC; END; $$ LANGUAGE plpgsql;