- Add import_id column to records (links each record to its import batch) - import_records() now stores readable dedup field values (not hashes) in info.inserted_keys / info.excluded_keys, and stamps import_id on insert - delete_import() simplified to delete log row; ON DELETE CASCADE removes records - Add get_import_log() and get_all_import_logs() DB functions - Add DELETE /api/sources/:name/import-log/:id endpoint - Add GET /api/sources/import-log global log endpoint - Import route now auto-applies transformations to new records after import - Import page: show ID column, expandable key detail, checkbox delete - New Log page: global view of all imports across sources - Update README API reference and workflow Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
225 lines
7.8 KiB
JavaScript
225 lines
7.8 KiB
JavaScript
/**
|
|
* Sources Routes
|
|
* Manage data sources
|
|
*/
|
|
|
|
const express = require('express');
|
|
const multer = require('multer');
|
|
const { parse } = require('csv-parse/sync');
|
|
const { lit, arr } = require('../lib/sql');
|
|
|
|
const upload = multer({ storage: multer.memoryStorage() });
|
|
|
|
module.exports = (pool) => {
|
|
const router = express.Router();
|
|
|
|
// Global import log (all sources)
|
|
router.get('/import-log', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM get_all_import_logs()`);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// List all sources
|
|
router.get('/', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM list_sources()`);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get single source
|
|
router.get('/:name', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM get_source(${lit(req.params.name)})`);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Suggest source definition from CSV
|
|
router.post('/suggest', upload.single('file'), async (req, res, next) => {
|
|
try {
|
|
if (!req.file) return res.status(400).json({ error: 'No file uploaded' });
|
|
|
|
const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true });
|
|
if (records.length === 0) return res.status(400).json({ error: 'CSV file is empty' });
|
|
|
|
const sample = records[0];
|
|
const fields = Object.keys(sample).map(key => {
|
|
const val = sample[key];
|
|
let type = 'text';
|
|
if (!isNaN(parseFloat(val)) && isFinite(val) && val.charAt(0) !== '0') {
|
|
type = 'numeric';
|
|
} else if (Date.parse(val) > Date.parse('1950-01-01') && Date.parse(val) < Date.parse('2050-01-01')) {
|
|
type = 'date';
|
|
}
|
|
return { name: key, type };
|
|
});
|
|
|
|
res.json({ name: '', dedup_fields: [], fields });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Create source
|
|
router.post('/', async (req, res, next) => {
|
|
try {
|
|
const { name, dedup_fields, config } = req.body;
|
|
if (!name || !dedup_fields || !Array.isArray(dedup_fields)) {
|
|
return res.status(400).json({ error: 'Missing required fields: name, dedup_fields (array)' });
|
|
}
|
|
const result = await pool.query(
|
|
`SELECT * FROM create_source(${lit(name)}, ${arr(dedup_fields)}, ${lit(config || {})})`
|
|
);
|
|
res.status(201).json(result.rows[0]);
|
|
} catch (err) {
|
|
if (err.code === '23505') return res.status(409).json({ error: 'Source already exists' });
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Update source
|
|
router.put('/:name', async (req, res, next) => {
|
|
try {
|
|
const { dedup_fields, config } = req.body;
|
|
const result = await pool.query(
|
|
`SELECT * FROM update_source(${lit(req.params.name)}, ${dedup_fields ? arr(dedup_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})`
|
|
);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Delete source
|
|
router.delete('/:name', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM delete_source(${lit(req.params.name)})`);
|
|
if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' });
|
|
res.json({ success: true, deleted: result.rows[0].delete_source });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Import CSV data and apply transformations to new records
|
|
router.post('/:name/import', upload.single('file'), async (req, res, next) => {
|
|
try {
|
|
if (!req.file) return res.status(400).json({ error: 'No file uploaded' });
|
|
const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true });
|
|
const importResult = await pool.query(
|
|
`SELECT import_records(${lit(req.params.name)}, ${lit(records)}) as result`
|
|
);
|
|
const importData = importResult.rows[0].result;
|
|
|
|
const transformResult = await pool.query(
|
|
`SELECT apply_transformations(${lit(req.params.name)}) as result`
|
|
);
|
|
const transformData = transformResult.rows[0].result;
|
|
|
|
res.json({ ...importData, transform: transformData });
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get import log
|
|
router.get('/:name/import-log', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM get_import_log(${lit(req.params.name)})`);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Delete an import (removes all records from that batch and the log entry)
|
|
router.delete('/:name/import-log/:id', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT delete_import(${lit(parseInt(req.params.id))}) as result`
|
|
);
|
|
const data = result.rows[0].result;
|
|
if (!data.success) return res.status(404).json(data);
|
|
res.json(data);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Apply transformations
|
|
router.post('/:name/transform', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT apply_transformations(${lit(req.params.name)}) as result`);
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get all known field names for a source
|
|
router.get('/:name/fields', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM get_source_fields(${lit(req.params.name)})`);
|
|
res.json(result.rows);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Generate output view
|
|
router.post('/:name/view', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT generate_source_view(${lit(req.params.name)}) as result`);
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Reprocess all records
|
|
router.post('/:name/reprocess', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT reprocess_records(${lit(req.params.name)}) as result`);
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get statistics
|
|
router.get('/:name/stats', async (req, res, next) => {
|
|
try {
|
|
const result = await pool.query(`SELECT * FROM get_source_stats(${lit(req.params.name)})`);
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
// Get view data (paginated, sortable)
|
|
router.get('/:name/view-data', async (req, res, next) => {
|
|
try {
|
|
const { limit = 100, offset = 0, sort_col, sort_dir } = req.query;
|
|
const result = await pool.query(
|
|
`SELECT get_view_data(${lit(req.params.name)}, ${lit(parseInt(limit))}, ${lit(parseInt(offset))}, ${lit(sort_col || null)}, ${lit(sort_dir || 'asc')}) as result`
|
|
);
|
|
res.json(result.rows[0].result);
|
|
} catch (err) {
|
|
next(err);
|
|
}
|
|
});
|
|
|
|
return router;
|
|
};
|