Clicking a column header reloads from page 1 with ORDER BY col ASC/DESC NULLS LAST passed to the view query. Sort column is validated against information_schema.columns to prevent injection. Pagination preserves the active sort across prev/next. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
329 lines
10 KiB
JavaScript
329 lines
10 KiB
JavaScript
/**
|
|
* Sources Routes
|
|
* Manage data sources
|
|
*/
|
|
|
|
const express = require('express');
|
|
const multer = require('multer');
|
|
const { parse } = require('csv-parse/sync');
|
|
|
|
const upload = multer({ storage: multer.memoryStorage() });
|
|
|
|
module.exports = (pool) => {
|
|
const router = express.Router();
|
|
|
|
// List all sources
|
|
router.get('/', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'SELECT * FROM sources ORDER BY name'
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get single source
|
|
router.get('/:name', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'SELECT * FROM sources WHERE name = $1',
|
|
[req.params.name]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Source not found' });
|
|
}
|
|
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Suggest source definition from CSV
|
|
router.post('/suggest', upload.single('file'), async (req, res, next) => {
|
|
try {
|
|
if (!req.file) {
|
|
return res.status(400).json({ error: 'No file uploaded' });
|
|
}
|
|
|
|
const records = parse(req.file.buffer, {
|
|
columns: true,
|
|
skip_empty_lines: true,
|
|
trim: true
|
|
});
|
|
|
|
if (records.length === 0) {
|
|
return res.status(400).json({ error: 'CSV file is empty' });
|
|
}
|
|
|
|
const sample = records[0];
|
|
const fields = Object.keys(sample).map(key => {
|
|
const val = sample[key];
|
|
let type = 'text';
|
|
|
|
if (!isNaN(parseFloat(val)) && isFinite(val) && val.charAt(0) !== '0') {
|
|
type = 'numeric';
|
|
} else if (Date.parse(val) > Date.parse('1950-01-01') && Date.parse(val) < Date.parse('2050-01-01')) {
|
|
type = 'date';
|
|
}
|
|
|
|
return { name: key, type };
|
|
});
|
|
|
|
res.json({
|
|
name: '',
|
|
dedup_fields: [],
|
|
fields
|
|
});
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Create source
|
|
router.post('/', async (req, res, next) => {
|
|
try {
|
|
const { name, dedup_fields, config } = req.body;
|
|
|
|
if (!name || !dedup_fields || !Array.isArray(dedup_fields)) {
|
|
return res.status(400).json({
|
|
error: 'Missing required fields: name, dedup_fields (array)'
|
|
});
|
|
}
|
|
|
|
const result = await pool.query(
|
|
`INSERT INTO sources (name, dedup_fields, config)
|
|
VALUES ($1, $2, $3)
|
|
RETURNING *`,
|
|
[name, dedup_fields, config || {}]
|
|
);
|
|
|
|
res.status(201).json(result.rows[0]);
|
|
} catch (err) {
|
|
if (err.code === '23505') { // Unique violation
|
|
return res.status(409).json({ error: 'Source already exists' });
|
|
}
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Update source
|
|
router.put('/:name', async (req, res, next) => {
|
|
try {
|
|
const { dedup_fields, config } = req.body;
|
|
|
|
const result = await pool.query(
|
|
`UPDATE sources
|
|
SET dedup_fields = COALESCE($2, dedup_fields),
|
|
config = COALESCE($3, config),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE name = $1
|
|
RETURNING *`,
|
|
[req.params.name, dedup_fields, config]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Source not found' });
|
|
}
|
|
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Delete source
|
|
router.delete('/:name', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'DELETE FROM sources WHERE name = $1 RETURNING name',
|
|
[req.params.name]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
return res.status(404).json({ error: 'Source not found' });
|
|
}
|
|
|
|
res.json({ success: true, deleted: result.rows[0].name });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Import CSV data
|
|
router.post('/:name/import', upload.single('file'), async (req, res, next) => {
|
|
try {
|
|
if (!req.file) {
|
|
return res.status(400).json({ error: 'No file uploaded' });
|
|
}
|
|
|
|
// Parse CSV
|
|
const records = parse(req.file.buffer, {
|
|
columns: true,
|
|
skip_empty_lines: true,
|
|
trim: true
|
|
});
|
|
|
|
// Import records
|
|
const result = await pool.query(
|
|
'SELECT import_records($1, $2) as result',
|
|
[req.params.name, JSON.stringify(records)]
|
|
);
|
|
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get import log
|
|
router.get('/:name/import-log', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT * FROM import_log
|
|
WHERE source_name = $1
|
|
ORDER BY imported_at DESC`,
|
|
[req.params.name]
|
|
);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Apply transformations
|
|
router.post('/:name/transform', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'SELECT apply_transformations($1) as result',
|
|
[req.params.name]
|
|
);
|
|
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get all known field names for a source
|
|
router.get('/:name/fields', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`
|
|
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
|
|
FROM (
|
|
SELECT f->>'name' AS key, 'schema' AS origin
|
|
FROM sources, jsonb_array_elements(config->'fields') f
|
|
WHERE name = $1 AND config ? 'fields'
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
|
|
FROM records WHERE source_name = $1
|
|
UNION ALL
|
|
SELECT output_field AS key, 'rule: ' || name AS origin
|
|
FROM rules WHERE source_name = $1
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
|
|
FROM mappings WHERE source_name = $1
|
|
) keys
|
|
GROUP BY key
|
|
ORDER BY key
|
|
`, [req.params.name]);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Generate output view
|
|
router.post('/:name/view', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'SELECT generate_source_view($1) as result',
|
|
[req.params.name]
|
|
);
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Reprocess all records
|
|
router.post('/:name/reprocess', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
'SELECT reprocess_records($1) as result',
|
|
[req.params.name]
|
|
);
|
|
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get statistics
|
|
router.get('/:name/stats', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT
|
|
COUNT(*) as total_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NOT NULL) as transformed_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NULL) as pending_records
|
|
FROM records
|
|
WHERE source_name = $1`,
|
|
[req.params.name]
|
|
);
|
|
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
router.get('/:name/view-data', async (req, res, next) => {
|
|
try {
|
|
const { limit = 100, offset = 0, sort_col, sort_dir } = req.query;
|
|
const viewName = `dfv.${req.params.name}`;
|
|
|
|
// Check view exists
|
|
const check = await pool.query(
|
|
`SELECT 1 FROM information_schema.views
|
|
WHERE table_schema = 'dfv' AND table_name = $1`,
|
|
[req.params.name]
|
|
);
|
|
|
|
if (check.rows.length === 0) {
|
|
return res.json({ exists: false, rows: [] });
|
|
}
|
|
|
|
// Validate sort_col against actual view columns to prevent injection
|
|
let orderClause = '';
|
|
if (sort_col) {
|
|
const cols = await pool.query(
|
|
`SELECT column_name FROM information_schema.columns
|
|
WHERE table_schema = 'dfv' AND table_name = $1`,
|
|
[req.params.name]
|
|
);
|
|
const validCols = cols.rows.map(r => r.column_name);
|
|
if (validCols.includes(sort_col)) {
|
|
const dir = sort_dir === 'desc' ? 'DESC' : 'ASC';
|
|
orderClause = ` ORDER BY "${sort_col}" ${dir} NULLS LAST`;
|
|
}
|
|
}
|
|
|
|
const result = await pool.query(
|
|
`SELECT * FROM ${viewName}${orderClause} LIMIT $1 OFFSET $2`,
|
|
[parseInt(limit), parseInt(offset)]
|
|
);
|
|
|
|
res.json({ exists: true, rows: result.rows });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
return router;
|
|
};
|