dataflow/api/routes/sources.js
Paul Trowbridge 928a54932d Add multi-capture regex, computed view fields, collapsible rules, and live preview
- Support multi-capture-group regex: mappings.input_value changed to JSONB,
  regexp_match() result stored as scalar or array JSONB in transformed column
- Computed expression fields in generated views: {fieldname} refs substituted
  with (transformed->>'fieldname')::numeric for arithmetic in view columns
- Fix generate_source_view to DROP VIEW before CREATE (avoids column drop error)
- Collapsible rule cards that open directly to inline edit form
- Debounced live regex preview (extract + replace) with popout modal for 50 rows
- Records page now shows dfv.<source> view output instead of raw records
- Unified field table in Sources: single table with In view, Seq, expression columns
- Fix "Rule already exists" error when editing by passing rule.id directly to submit
- Fix Sources page clearing on F5 by watching sourceObj?.name in useEffect dep

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 16:37:15 -04:00

314 lines
9.3 KiB
JavaScript

/**
* Sources Routes
* Manage data sources
*/
const express = require('express');
const multer = require('multer');
const { parse } = require('csv-parse/sync');
const upload = multer({ storage: multer.memoryStorage() });
module.exports = (pool) => {
const router = express.Router();
// List all sources
router.get('/', async (req, res, next) => {
try {
const result = await pool.query(
'SELECT * FROM sources ORDER BY name'
);
res.json(result.rows);
} catch (err) {
next(err);
}
});
// Get single source
router.get('/:name', async (req, res, next) => {
try {
const result = await pool.query(
'SELECT * FROM sources WHERE name = $1',
[req.params.name]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Source not found' });
}
res.json(result.rows[0]);
} catch (err) {
next(err);
}
});
// Suggest source definition from CSV
router.post('/suggest', upload.single('file'), async (req, res, next) => {
try {
if (!req.file) {
return res.status(400).json({ error: 'No file uploaded' });
}
const records = parse(req.file.buffer, {
columns: true,
skip_empty_lines: true,
trim: true
});
if (records.length === 0) {
return res.status(400).json({ error: 'CSV file is empty' });
}
const sample = records[0];
const fields = Object.keys(sample).map(key => {
const val = sample[key];
let type = 'text';
if (!isNaN(parseFloat(val)) && isFinite(val) && val.charAt(0) !== '0') {
type = 'numeric';
} else if (Date.parse(val) > Date.parse('1950-01-01') && Date.parse(val) < Date.parse('2050-01-01')) {
type = 'date';
}
return { name: key, type };
});
res.json({
name: '',
dedup_fields: [],
fields
});
} catch (err) {
next(err);
}
});
// Create source
router.post('/', async (req, res, next) => {
try {
const { name, dedup_fields, config } = req.body;
if (!name || !dedup_fields || !Array.isArray(dedup_fields)) {
return res.status(400).json({
error: 'Missing required fields: name, dedup_fields (array)'
});
}
const result = await pool.query(
`INSERT INTO sources (name, dedup_fields, config)
VALUES ($1, $2, $3)
RETURNING *`,
[name, dedup_fields, config || {}]
);
res.status(201).json(result.rows[0]);
} catch (err) {
if (err.code === '23505') { // Unique violation
return res.status(409).json({ error: 'Source already exists' });
}
next(err);
}
});
// Update source
router.put('/:name', async (req, res, next) => {
try {
const { dedup_fields, config } = req.body;
const result = await pool.query(
`UPDATE sources
SET dedup_fields = COALESCE($2, dedup_fields),
config = COALESCE($3, config),
updated_at = CURRENT_TIMESTAMP
WHERE name = $1
RETURNING *`,
[req.params.name, dedup_fields, config]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Source not found' });
}
res.json(result.rows[0]);
} catch (err) {
next(err);
}
});
// Delete source
router.delete('/:name', async (req, res, next) => {
try {
const result = await pool.query(
'DELETE FROM sources WHERE name = $1 RETURNING name',
[req.params.name]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Source not found' });
}
res.json({ success: true, deleted: result.rows[0].name });
} catch (err) {
next(err);
}
});
// Import CSV data
router.post('/:name/import', upload.single('file'), async (req, res, next) => {
try {
if (!req.file) {
return res.status(400).json({ error: 'No file uploaded' });
}
// Parse CSV
const records = parse(req.file.buffer, {
columns: true,
skip_empty_lines: true,
trim: true
});
// Import records
const result = await pool.query(
'SELECT import_records($1, $2) as result',
[req.params.name, JSON.stringify(records)]
);
res.json(result.rows[0].result);
} catch (err) {
next(err);
}
});
// Get import log
router.get('/:name/import-log', async (req, res, next) => {
try {
const result = await pool.query(
`SELECT * FROM import_log
WHERE source_name = $1
ORDER BY imported_at DESC`,
[req.params.name]
);
res.json(result.rows);
} catch (err) {
next(err);
}
});
// Apply transformations
router.post('/:name/transform', async (req, res, next) => {
try {
const result = await pool.query(
'SELECT apply_transformations($1) as result',
[req.params.name]
);
res.json(result.rows[0].result);
} catch (err) {
next(err);
}
});
// Get all known field names for a source
router.get('/:name/fields', async (req, res, next) => {
try {
const result = await pool.query(`
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
FROM (
SELECT f->>'name' AS key, 'schema' AS origin
FROM sources, jsonb_array_elements(config->'fields') f
WHERE name = $1 AND config ? 'fields'
UNION ALL
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
FROM records WHERE source_name = $1
UNION ALL
SELECT output_field AS key, 'rule: ' || name AS origin
FROM rules WHERE source_name = $1
UNION ALL
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
FROM mappings WHERE source_name = $1
) keys
GROUP BY key
ORDER BY key
`, [req.params.name]);
res.json(result.rows);
} catch (err) {
next(err);
}
});
// Generate output view
router.post('/:name/view', async (req, res, next) => {
try {
const result = await pool.query(
'SELECT generate_source_view($1) as result',
[req.params.name]
);
res.json(result.rows[0].result);
} catch (err) {
next(err);
}
});
// Reprocess all records
router.post('/:name/reprocess', async (req, res, next) => {
try {
const result = await pool.query(
'SELECT reprocess_records($1) as result',
[req.params.name]
);
res.json(result.rows[0].result);
} catch (err) {
next(err);
}
});
// Get statistics
router.get('/:name/stats', async (req, res, next) => {
try {
const result = await pool.query(
`SELECT
COUNT(*) as total_records,
COUNT(*) FILTER (WHERE transformed IS NOT NULL) as transformed_records,
COUNT(*) FILTER (WHERE transformed IS NULL) as pending_records
FROM records
WHERE source_name = $1`,
[req.params.name]
);
res.json(result.rows[0]);
} catch (err) {
next(err);
}
});
router.get('/:name/view-data', async (req, res, next) => {
try {
const { limit = 100, offset = 0 } = req.query;
const viewName = `dfv.${req.params.name}`;
// Check view exists
const check = await pool.query(
`SELECT 1 FROM information_schema.views
WHERE table_schema = 'dfv' AND table_name = $1`,
[req.params.name]
);
if (check.rows.length === 0) {
return res.json({ exists: false, rows: [] });
}
const result = await pool.query(
`SELECT * FROM ${viewName} LIMIT $1 OFFSET $2`,
[parseInt(limit), parseInt(offset)]
);
res.json({ exists: true, rows: result.rows });
} catch (err) {
next(err);
}
});
return router;
};