From 291c665ed157e850c4a86c24c91d98a3d051675c Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Sun, 5 Apr 2026 22:36:53 -0400 Subject: [PATCH] Consolidate all SQL into database/queries/, switch to literal SQL in routes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add database/queries/{sources,rules,mappings,records}.sql — one file per route, all business logic in PostgreSQL functions - Replace parameterized queries in all four route files with lit()/jsonLit() literal interpolation for debuggability - Add api/lib/sql.js with lit(), jsonLit(), arr() helpers - Fix get_view_data to use json_agg (preserves column order) with subquery (guarantees sort order is respected before aggregation) - Fix jsonLit() for JSONB params so plain strings become valid JSON - Update manage.py option 3 to deploy database/queries/ instead of functions.sql - Add SPEC.md covering architecture, philosophy, and manage.py Co-Authored-By: Claude Sonnet 4.6 --- SPEC.md | 307 ++++++++++++++++++++++++++++++++ api/lib/sql.js | 29 +++ api/routes/mappings.js | 164 +++-------------- api/routes/records.js | 61 ++----- api/routes/rules.js | 158 ++--------------- api/routes/sources.js | 191 +++----------------- database/queries/mappings.sql | 208 ++++++++++++++++++++++ database/queries/records.sql | 55 ++++++ database/queries/rules.sql | 169 ++++++++++++++++++ database/queries/sources.sql | 322 ++++++++++++++++++++++++++++++++++ manage.py | 65 +++++-- 11 files changed, 1224 insertions(+), 505 deletions(-) create mode 100644 SPEC.md create mode 100644 api/lib/sql.js create mode 100644 database/queries/mappings.sql create mode 100644 database/queries/records.sql create mode 100644 database/queries/rules.sql create mode 100644 database/queries/sources.sql diff --git a/SPEC.md b/SPEC.md new file mode 100644 index 0000000..5e77ebb --- /dev/null +++ b/SPEC.md @@ -0,0 +1,307 @@ +# Dataflow — Application Specification + +## Purpose + +Dataflow is an internal tool for importing, cleaning, and standardizing data from multiple sources (e.g., bank transaction exports). It is intentionally simple — no queues, no schedulers, no plugins. Everything is triggered explicitly by the user through the UI or API. + +--- + +## Philosophy and Design Choices + +### Simple over clever +Every decision favors clarity. No abstractions for their own sake, no frameworks beyond what is necessary. If a piece of code is hard to follow, that is a sign it should be rewritten, not abstracted. + +### All logic lives in the database +SQL functions are the single source of truth for business logic. The API layer is a thin HTTP wrapper — it validates input, calls a function, and returns the result. It does not construct business logic in JavaScript. + +### SQL is written as full literal strings +Database calls in the route files use fully formed SQL strings with values interpolated directly (not parameterized). This makes every query copy-pasteable into psql for debugging. A small `lit()` helper in `api/lib/sql.js` handles quoting and escaping. This is an intentional trade-off: the tool is internal, and debuggability is worth more than the marginal injection protection parameterization provides over what `lit()` already does. + +### One SQL file per route +SQL is organized in `database/queries/` with one file per route (`sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`). This makes it easy to find the SQL behind any API endpoint — look at the route file to find the function name, then look at the matching query file for the implementation. + +### Explicit over implicit +Nothing happens automatically. Transformations are triggered by the user. Views are generated on demand. There are no database triggers, no background workers, no scheduled jobs. + +### JSONB for flexibility +Raw imported records and transformed records are stored as JSONB. This avoids schema migrations when source formats change and allows different sources to have different field sets in the same table. + +--- + +## Architecture + +``` +manage.py — interactive CLI for setup, deployment, and management +database/ + schema.sql — table definitions (run once or to reset) + queries/ + sources.sql — all SQL for /api/sources + rules.sql — all SQL for /api/rules + mappings.sql — all SQL for /api/mappings + records.sql — all SQL for /api/records +api/ + server.js — Express server, mounts routes, auth middleware + middleware/ + auth.js — Basic Auth enforcement on all /api routes + lib/ + sql.js — lit() and arr() helpers for SQL literal building + routes/ + sources.js — HTTP handlers for source management + rules.js — HTTP handlers for rule management + mappings.js — HTTP handlers for mapping management + records.js — HTTP handlers for record queries +ui/ + src/ + api.js — fetch wrapper, credential management + App.jsx — root: login gate, sidebar, source selector, routing + pages/ + Login.jsx — username/password form + Sources.jsx — source CRUD, field config, view generation + Import.jsx — CSV upload and import log + Rules.jsx — rule CRUD with live pattern preview + Mappings.jsx — mapping table with TSV import/export + Records.jsx — paginated, sortable view of transformed records +public/ — compiled UI (output of npm run build in ui/) +``` + +--- + +## Database Schema + +Five tables in the `dataflow` schema: + +### `sources` +Defines a data source. The `dedup_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view. + +### `records` +Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `dedup_key` is an MD5 hash of the dedup fields — a unique constraint on `(source_name, dedup_key)` prevents duplicate imports. + +### `rules` +Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it. + +### `mappings` +Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`). + +### `import_log` +Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. + +--- + +## Data Flow + +### Import +``` +CSV file → parse in Node.js → import_records(source, data) + → generate_dedup_key() per record → INSERT with unique constraint + → count inserted vs duplicates → log to import_log +``` + +### Transform +``` +apply_transformations(source) — pure SQL CTE + qualifying: records not yet transformed (or all, if overwrite=true) + rx: fan out one row per regex match (LATERAL join, rules × records) + agg_matches: collapse matches back to one value per (record, rule) + linked: LEFT JOIN mappings to find mapped output + rule_output: build per-rule output JSONB + record_additions: merge all rule outputs per record in sequence order + UPDATE records SET transformed = data || additions +``` +The transform is fully set-based — no row-by-row loops. All records for a source are processed in a single query. + +### View generation +`generate_source_view(source)` reads `config.fields` from the source and builds a `CREATE VIEW` statement in the `dfv` schema. Each field is cast to its declared type (`text`, `numeric`, `date`). Fields with an `expression` are computed from other transformed fields using `{field}` substitution syntax. + +--- + +## SQL Functions + +Each file in `database/queries/` maps 1-to-1 with a route file. + +**sources.sql** +`list_sources`, `get_source`, `create_source`, `update_source`, `delete_source`, `get_import_log`, `get_source_stats`, `get_source_fields`, `get_view_data` (plpgsql — dynamic sort via EXECUTE + quote_ident), `import_records`, `jsonb_merge` + `jsonb_concat_obj` aggregate, `apply_transformations`, `reprocess_records`, `generate_source_view` + +**rules.sql** +`list_rules`, `get_rule`, `create_rule`, `update_rule`, `delete_rule`, `preview_rule` (plpgsql — conditional query for extract vs replace), `test_rule` (returns `TABLE(rule JSONB, results JSONB)`) + +**mappings.sql** +`list_mappings`, `get_mapping`, `create_mapping`, `upsert_mapping`, `update_mapping`, `delete_mapping`, `get_mapping_counts`, `get_all_values` (plpgsql — extracted values with mapping join), `get_unmapped_values` (plpgsql — extracted values with no mapping) + +**records.sql** +`list_records`, `get_record`, `search_records` (JSONB containment on data and transformed), `delete_record`, `delete_source_records` + +--- + +## API + +All routes are under `/api`. Every route requires HTTP Basic Auth. The `GET /health` endpoint is exempt. + +**Authentication:** `Authorization: Basic ` on every request. Credentials are verified against `LOGIN_USER` (plaintext username) and `LOGIN_PASSWORD_HASH` (bcrypt hash) in `.env`. There are no sessions or tokens — credentials are sent with every request. + +**Route summary:** + +| Method | Path | Description | +|--------|------|-------------| +| GET | /api/sources | List all sources | +| POST | /api/sources | Create source | +| GET | /api/sources/:name | Get source | +| PUT | /api/sources/:name | Update source (dedup_fields, config) | +| DELETE | /api/sources/:name | Delete source and all data | +| POST | /api/sources/suggest | Suggest source config from CSV upload | +| POST | /api/sources/:name/import | Import CSV records | +| GET | /api/sources/:name/import-log | Import history | +| GET | /api/sources/:name/stats | Record counts | +| GET | /api/sources/:name/fields | All known field names and origins | +| GET | /api/sources/:name/view-data | Paginated, sortable view data | +| POST | /api/sources/:name/transform | Apply transformations (new records only) | +| POST | /api/sources/:name/reprocess | Reapply transformations to all records | +| POST | /api/sources/:name/view | Generate dfv view | +| GET | /api/rules/source/:name | List rules for a source | +| GET | /api/rules/preview | Preview pattern against live records | +| GET | /api/rules/:id/test | Test saved rule against live records | +| POST | /api/rules | Create rule | +| PUT | /api/rules/:id | Update rule | +| DELETE | /api/rules/:id | Delete rule | +| GET | /api/mappings/source/:name | List mappings | +| GET | /api/mappings/source/:name/all-values | All extracted values (mapped + unmapped) | +| GET | /api/mappings/source/:name/unmapped | Only unmapped extracted values | +| GET | /api/mappings/source/:name/counts | Record counts per mapping | +| GET | /api/mappings/source/:name/export.tsv | Export mappings as TSV | +| POST | /api/mappings/source/:name/import-csv | Import/update mappings from TSV | +| POST | /api/mappings | Create mapping | +| POST | /api/mappings/bulk | Upsert multiple mappings | +| PUT | /api/mappings/:id | Update mapping | +| DELETE | /api/mappings/:id | Delete mapping | +| GET | /api/records/source/:name | List raw records | +| GET | /api/records/:id | Get single record | +| POST | /api/records/search | Search by JSONB containment | +| DELETE | /api/records/:id | Delete record | +| DELETE | /api/records/source/:name/all | Delete all records for a source | + +--- + +## `api/lib/sql.js` — SQL Literal Helpers + +```js +lit(val) // JS value → SQL literal: 'text', TRUE, 42, NULL, '{"json":"val"}' +arr(val) // JS array → PostgreSQL array literal: ARRAY['a','b'] +``` + +Single quotes within string values are escaped by doubling them (`'`→`''`). Objects and arrays are JSON-serialized. These helpers exist so that query strings in route files are fully formed and can be copied directly into psql. + +--- + +## UI + +Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The server serves `public/` as static files and falls back to `public/index.html` for all non-API routes (SPA routing). + +**Authentication flow:** +1. On load, check `sessionStorage` for saved credentials. If found, re-authenticate silently via `GET /api/sources`. +2. On login, credentials are stored in memory (`api.js` module-level) and in `sessionStorage` (survives page refresh, cleared on tab close). +3. On 401 response, credentials are cleared and the login screen is shown. +4. `localStorage` persists the selected source name across sessions. + +**Pages:** + +- **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields. + +- **Import** — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import. + +- **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle. + +- **Mappings** — Tabular mapping editor. Shows all extracted values from transformed records with record counts and sample raw data. Rows are yellow (unmapped), white (mapped), or blue (edited but unsaved). Supports TSV export and import. Columns can be added dynamically. + +- **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort. + +--- + +## manage.py + +Interactive CLI for setup and operations. Run with `python3 manage.py`. Requires no arguments. + +Shows current status on every screen: +- Database connection (host, port, db, user) and whether it succeeds +- Whether the `dataflow` schema is deployed +- Whether SQL functions are deployed (detected by presence of `apply_transformations`) +- Login credentials (configured / not configured) +- UI build status and timestamp +- Systemd service status +- Nginx reverse proxy status + +**Menu options:** + +1. **Database configuration and deployment dialog** — Write or update `.env` with database connection settings. If connection fails, offers to create the database and user using admin credentials. Optionally deploys schema and SQL functions after `.env` is written. + +2. **Redeploy schema** — Runs `database/schema.sql` against the configured database. Warns that this drops all data. Requires explicit confirmation. + +3. **Redeploy SQL functions** — Runs all four files in `database/queries/` in order: `sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`. Safe to run at any time without data loss. + +4. **Build UI** — Runs `npm run build` in `ui/`, outputting to `public/`. + +5. **Set up nginx reverse proxy** — Detects if a proxy already exists for the configured port. Writes an nginx site config to `/etc/nginx/sites-enabled/`. If an SSL certificate exists in `/etc/letsencrypt/live/`, configures HTTPS with HTTP redirect; otherwise configures HTTP only with an offer to run certbot. Requires sudo. + +6. **Install systemd service** — Copies `dataflow.service` to `/etc/systemd/system/`, runs `daemon-reload`, enables on boot. Requires sudo. + +7. **Start / restart service** — Runs `systemctl start` or `systemctl restart` depending on current state. Requires sudo. + +8. **Stop service** — Runs `systemctl stop`. Requires sudo. + +9. **Set login credentials** — Prompts for username and password, bcrypt-hashes the password via `node -e "require('bcrypt')..."`, and writes `LOGIN_USER` and `LOGIN_PASSWORD_HASH` to `.env`. Requires Node.js and bcrypt npm package to be installed. + +**Key behaviors:** +- All commands that will be run are printed before the user is asked to confirm. +- Actions that require sudo prompt transparently — `sudo` is not run with `-n`, so it uses cached credentials or prompts as normal. +- Nginx config files are written via `sudo cp` from a temp file, then `sudo chmod 644` to make them world-readable for status detection. +- Certificate existence is checked with `sudo test -f` since `/etc/letsencrypt/live/` requires root. +- Option 1's result (updated config) is passed back to the menu loop so status reflects changes without requiring a restart. + +--- + +## Environment Variables (`.env`) + +``` +DB_HOST PostgreSQL host +DB_PORT PostgreSQL port (default 5432) +DB_NAME Database name +DB_USER Database user +DB_PASSWORD Database password +API_PORT Port the Express server listens on (default 3020) +NODE_ENV development | production +LOGIN_USER Username for Basic Auth +LOGIN_PASSWORD_HASH bcrypt hash of the password +``` + +--- + +## Running the Application + +```bash +# Install API dependencies +npm install + +# Install UI dependencies and build +cd ui && npm install && npm run build && cd .. + +# Start (development, auto-reload) +npm run dev + +# Start (production) +npm start +``` + +The server binds to `0.0.0.0` on `API_PORT` and serves both the API and the compiled UI from `public/`. + +--- + +## Deploying SQL Changes + +Any time SQL functions are modified: +```bash +PGPASSWORD= psql -h -U -d -f database/queries/sources.sql +PGPASSWORD= psql -h -U -d -f database/queries/rules.sql +PGPASSWORD= psql -h -U -d -f database/queries/mappings.sql +PGPASSWORD= psql -h -U -d -f database/queries/records.sql +``` +Then restart the server. Function deployment is safe to repeat — all functions use `CREATE OR REPLACE`. + +Schema changes (`schema.sql`) drop and recreate the schema, deleting all data. In production, write migration scripts instead. diff --git a/api/lib/sql.js b/api/lib/sql.js new file mode 100644 index 0000000..095b1f5 --- /dev/null +++ b/api/lib/sql.js @@ -0,0 +1,29 @@ +/** + * SQL literal helpers + * Build literal SQL values for direct interpolation into query strings. + * Queries are fully formed strings — easy to copy and run in psql for debugging. + */ + +// Escape a single value for safe interpolation into SQL +function lit(val) { + if (val === null || val === undefined) return 'NULL'; + if (typeof val === 'boolean') return val ? 'TRUE' : 'FALSE'; + if (typeof val === 'number') return String(Math.trunc(val)); + if (typeof val === 'object') return `'${JSON.stringify(val).replace(/'/g, "''")}'`; + return `'${String(val).replace(/'/g, "''")}'`; +} + +// Format a value for a JSONB parameter — always JSON.stringifies first so that +// plain strings become valid JSON ("ROBLOX" not ROBLOX), arrays and objects work too. +function jsonLit(val) { + if (val === null || val === undefined) return 'NULL'; + return `'${JSON.stringify(val).replace(/'/g, "''")}'`; +} + +// Format a JS array as a PostgreSQL text array literal: ARRAY['a','b'] +function arr(val) { + if (!Array.isArray(val) || val.length === 0) return "ARRAY[]::text[]"; + return `ARRAY[${val.map(s => `'${String(s).replace(/'/g, "''")}'`).join(',')}]`; +} + +module.exports = { lit, jsonLit, arr }; diff --git a/api/routes/mappings.js b/api/routes/mappings.js index d459b8c..88b08bc 100644 --- a/api/routes/mappings.js +++ b/api/routes/mappings.js @@ -6,6 +6,7 @@ const express = require('express'); const multer = require('multer'); const { parse } = require('csv-parse/sync'); +const { lit, jsonLit } = require('../lib/sql'); const upload = multer({ storage: multer.memoryStorage() }); @@ -17,19 +18,9 @@ module.exports = (pool) => { // List all mappings for a source router.get('/source/:source_name', async (req, res, next) => { try { - const { rule_name } = req.query; - - let query = 'SELECT * FROM mappings WHERE source_name = $1'; - const params = [req.params.source_name]; - - if (rule_name) { - query += ' AND rule_name = $2'; - params.push(rule_name); - } - - query += ' ORDER BY rule_name, input_value'; - - const result = await pool.query(query, params); + const result = await pool.query( + `SELECT * FROM list_mappings(${lit(req.params.source_name)}, ${lit(req.query.rule_name || null)})` + ); res.json(result.rows); } catch (err) { next(err); @@ -39,29 +30,9 @@ module.exports = (pool) => { // Get record counts for existing mappings router.get('/source/:source_name/counts', async (req, res, next) => { try { - const { rule_name } = req.query; - const params = [req.params.source_name]; - let ruleFilter = ''; - if (rule_name) { - ruleFilter = 'AND m.rule_name = $2'; - params.push(rule_name); - } - - const result = await pool.query(` - SELECT - m.rule_name, - m.input_value, - COUNT(rec.id) AS record_count - FROM mappings m - JOIN rules r ON r.source_name = m.source_name AND r.name = m.rule_name - LEFT JOIN records rec ON - rec.source_name = m.source_name - AND rec.transformed ? r.output_field - AND rec.transformed -> r.output_field = m.input_value - WHERE m.source_name = $1 ${ruleFilter} - GROUP BY m.rule_name, m.input_value - `, params); - + const result = await pool.query( + `SELECT * FROM get_mapping_counts(${lit(req.params.source_name)}, ${lit(req.query.rule_name || null)})` + ); res.json(result.rows); } catch (err) { next(err); @@ -71,26 +42,20 @@ module.exports = (pool) => { // Get unmapped values router.get('/source/:source_name/unmapped', async (req, res, next) => { try { - const { rule_name } = req.query; - const result = await pool.query( - 'SELECT * FROM get_unmapped_values($1, $2)', - [req.params.source_name, rule_name || null] + `SELECT * FROM get_unmapped_values(${lit(req.params.source_name)}, ${lit(req.query.rule_name || null)})` ); - res.json(result.rows); } catch (err) { next(err); } }); - // Get all extracted values (mapped + unmapped) with counts — single SQL run + // Get all extracted values (mapped + unmapped) with counts router.get('/source/:source_name/all-values', async (req, res, next) => { try { - const { rule_name } = req.query; const result = await pool.query( - 'SELECT * FROM get_all_values($1, $2)', - [req.params.source_name, rule_name || null] + `SELECT * FROM get_all_values(${lit(req.params.source_name)}, ${lit(req.query.rule_name || null)})` ); res.json(result.rows); } catch (err) { @@ -98,20 +63,16 @@ module.exports = (pool) => { } }); - // Export all extracted values (mapped + unmapped) as TSV via get_all_values - // Columns: source_name, rule_name, input_value, record_count, , sample - // sample is always last and is discarded on import + // Export all extracted values as TSV router.get('/source/:source_name/export.tsv', async (req, res, next) => { try { const { rule_name } = req.query; const source_name = req.params.source_name; const result = await pool.query( - 'SELECT * FROM get_all_values($1, $2)', - [source_name, rule_name || null] + `SELECT * FROM get_all_values(${lit(source_name)}, ${lit(rule_name || null)})` ); - // Collect output keys from mapped rows const outputKeys = []; for (const row of result.rows) { for (const key of Object.keys(row.output || {})) { @@ -120,8 +81,6 @@ module.exports = (pool) => { } const escape = (val) => String(val ?? '').replace(/\t/g, ' '); - - // sample is always last const allCols = ['source_name', 'rule_name', 'input_value', 'record_count', ...outputKeys, 'sample']; const dataRows = result.rows.map(row => { @@ -150,8 +109,6 @@ module.exports = (pool) => { }); // Import mappings from uploaded TSV - // Any column that isn't a system field (source_name, rule_name, input_value, record_count, sample) - // is treated as an output key. sample is discarded wherever it appears. router.post('/source/:source_name/import-csv', upload.single('file'), async (req, res, next) => { const client = await pool.connect(); try { @@ -160,43 +117,29 @@ module.exports = (pool) => { } const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true, delimiter: '\t' }); - - if (records.length === 0) { - return res.status(400).json({ error: 'File is empty.' }); - } + if (records.length === 0) return res.status(400).json({ error: 'File is empty.' }); const outputKeys = Object.keys(records[0]).filter(k => !SYSTEM_COLS.has(k)); - const mappings = []; for (const row of records) { const { source_name, rule_name, input_value } = row; - const output = {}; for (const key of outputKeys) { if (row[key] && row[key].trim() !== '') output[key] = row[key].trim(); } if (Object.keys(output).length === 0) continue; - let parsedInput; try { parsedInput = JSON.parse(input_value); } catch { parsedInput = input_value; } - mappings.push({ source_name, rule_name, input_value: parsedInput, output }); } - if (mappings.length === 0) { - return res.status(400).json({ error: 'No rows with output values filled in.' }); - } + if (mappings.length === 0) return res.status(400).json({ error: 'No rows with output values filled in.' }); await client.query('BEGIN'); const results = []; for (const { source_name, rule_name, input_value, output } of mappings) { const result = await client.query( - `INSERT INTO mappings (source_name, rule_name, input_value, output) - VALUES ($1, $2, $3, $4) - ON CONFLICT (source_name, rule_name, input_value) - DO UPDATE SET output = EXCLUDED.output - RETURNING *`, - [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] + `SELECT * FROM upsert_mapping(${lit(source_name)}, ${lit(rule_name)}, ${jsonLit(input_value)}, ${jsonLit(output)})` ); results.push(result.rows[0]); } @@ -214,15 +157,8 @@ module.exports = (pool) => { // Get single mapping router.get('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM mappings WHERE id = $1', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Mapping not found' }); - } - + const result = await pool.query(`SELECT * FROM get_mapping(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Mapping not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -233,60 +169,35 @@ module.exports = (pool) => { router.post('/', async (req, res, next) => { try { const { source_name, rule_name, input_value, output } = req.body; - if (!source_name || !rule_name || !input_value || !output) { - return res.status(400).json({ - error: 'Missing required fields: source_name, rule_name, input_value, output' - }); + return res.status(400).json({ error: 'Missing required fields: source_name, rule_name, input_value, output' }); } - const result = await pool.query( - `INSERT INTO mappings (source_name, rule_name, input_value, output) - VALUES ($1, $2, $3, $4) - RETURNING *`, - [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] + `SELECT * FROM create_mapping(${lit(source_name)}, ${lit(rule_name)}, ${jsonLit(input_value)}, ${jsonLit(output)})` ); - res.status(201).json(result.rows[0]); } catch (err) { - if (err.code === '23505') { // Unique violation - return res.status(409).json({ error: 'Mapping already exists' }); - } - if (err.code === '23503') { // Foreign key violation - return res.status(404).json({ error: 'Source or rule not found' }); - } + if (err.code === '23505') return res.status(409).json({ error: 'Mapping already exists' }); + if (err.code === '23503') return res.status(404).json({ error: 'Source or rule not found' }); next(err); } }); - // Bulk create mappings + // Bulk create/update mappings router.post('/bulk', async (req, res, next) => { const client = await pool.connect(); try { const { mappings } = req.body; - - if (!Array.isArray(mappings)) { - return res.status(400).json({ error: 'Expected array of mappings' }); - } + if (!Array.isArray(mappings)) return res.status(400).json({ error: 'Expected array of mappings' }); await client.query('BEGIN'); - const results = []; - for (const mapping of mappings) { - const { source_name, rule_name, input_value, output } = mapping; - + for (const { source_name, rule_name, input_value, output } of mappings) { const result = await client.query( - `INSERT INTO mappings (source_name, rule_name, input_value, output) - VALUES ($1, $2, $3, $4) - ON CONFLICT (source_name, rule_name, input_value) - DO UPDATE SET output = EXCLUDED.output - RETURNING *`, - [source_name, rule_name, JSON.stringify(input_value), JSON.stringify(output)] + `SELECT * FROM upsert_mapping(${lit(source_name)}, ${lit(rule_name)}, ${jsonLit(input_value)}, ${jsonLit(output)})` ); - results.push(result.rows[0]); } - await client.query('COMMIT'); res.status(201).json({ count: results.length, mappings: results }); } catch (err) { @@ -301,20 +212,10 @@ module.exports = (pool) => { router.put('/:id', async (req, res, next) => { try { const { input_value, output } = req.body; - const result = await pool.query( - `UPDATE mappings - SET input_value = COALESCE($2, input_value), - output = COALESCE($3, output) - WHERE id = $1 - RETURNING *`, - [req.params.id, input_value, output ? JSON.stringify(output) : null] + `SELECT * FROM update_mapping(${lit(parseInt(req.params.id))}, ${input_value != null ? jsonLit(input_value) : 'NULL'}, ${output != null ? jsonLit(output) : 'NULL'})` ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Mapping not found' }); - } - + if (result.rows.length === 0) return res.status(404).json({ error: 'Mapping not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -324,15 +225,8 @@ module.exports = (pool) => { // Delete mapping router.delete('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'DELETE FROM mappings WHERE id = $1 RETURNING id', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Mapping not found' }); - } - + const result = await pool.query(`SELECT * FROM delete_mapping(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Mapping not found' }); res.json({ success: true, deleted: result.rows[0].id }); } catch (err) { next(err); diff --git a/api/routes/records.js b/api/routes/records.js index be3ebdb..63eb63e 100644 --- a/api/routes/records.js +++ b/api/routes/records.js @@ -4,6 +4,7 @@ */ const express = require('express'); +const { lit } = require('../lib/sql'); module.exports = (pool) => { const router = express.Router(); @@ -12,18 +13,9 @@ module.exports = (pool) => { router.get('/source/:source_name', async (req, res, next) => { try { const { limit = 100, offset = 0, transformed_only } = req.query; - - let query = 'SELECT * FROM records WHERE source_name = $1'; - const params = [req.params.source_name]; - - if (transformed_only === 'true') { - query += ' AND transformed IS NOT NULL'; - } - - query += ' ORDER BY id DESC LIMIT $2 OFFSET $3'; - params.push(parseInt(limit), parseInt(offset)); - - const result = await pool.query(query, params); + const result = await pool.query( + `SELECT * FROM list_records(${lit(req.params.source_name)}, ${lit(parseInt(limit))}, ${lit(parseInt(offset))}, ${lit(transformed_only === 'true')})` + ); res.json(result.rows); } catch (err) { next(err); @@ -33,15 +25,8 @@ module.exports = (pool) => { // Get single record router.get('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM records WHERE id = $1', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Record not found' }); - } - + const result = await pool.query(`SELECT * FROM get_record(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Record not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -52,23 +37,12 @@ module.exports = (pool) => { router.post('/search', async (req, res, next) => { try { const { source_name, query, limit = 100 } = req.body; - if (!source_name || !query) { - return res.status(400).json({ - error: 'Missing required fields: source_name, query' - }); + return res.status(400).json({ error: 'Missing required fields: source_name, query' }); } - - // Search in both data and transformed fields const result = await pool.query( - `SELECT * FROM records - WHERE source_name = $1 - AND (data @> $2 OR transformed @> $2) - ORDER BY id DESC - LIMIT $3`, - [source_name, JSON.stringify(query), parseInt(limit)] + `SELECT * FROM search_records(${lit(source_name)}, ${lit(query)}, ${lit(parseInt(limit))})` ); - res.json(result.rows); } catch (err) { next(err); @@ -78,15 +52,8 @@ module.exports = (pool) => { // Delete record router.delete('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'DELETE FROM records WHERE id = $1 RETURNING id', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Record not found' }); - } - + const result = await pool.query(`SELECT * FROM delete_record(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Record not found' }); res.json({ success: true, deleted: result.rows[0].id }); } catch (err) { next(err); @@ -96,12 +63,8 @@ module.exports = (pool) => { // Delete all records for a source router.delete('/source/:source_name/all', async (req, res, next) => { try { - const result = await pool.query( - 'DELETE FROM records WHERE source_name = $1', - [req.params.source_name] - ); - - res.json({ success: true, deleted_count: result.rowCount }); + const result = await pool.query(`SELECT * FROM delete_source_records(${lit(req.params.source_name)})`); + res.json({ success: true, deleted_count: result.rows[0].deleted_count }); } catch (err) { next(err); } diff --git a/api/routes/rules.js b/api/routes/rules.js index 8dffd4f..957bfde 100644 --- a/api/routes/rules.js +++ b/api/routes/rules.js @@ -4,6 +4,7 @@ */ const express = require('express'); +const { lit } = require('../lib/sql'); module.exports = (pool) => { const router = express.Router(); @@ -11,10 +12,7 @@ module.exports = (pool) => { // List all rules for a source router.get('/source/:source_name', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM rules WHERE source_name = $1 ORDER BY sequence, name', - [req.params.source_name] - ); + const result = await pool.query(`SELECT * FROM list_rules(${lit(req.params.source_name)})`); res.json(result.rows); } catch (err) { next(err); @@ -25,47 +23,12 @@ module.exports = (pool) => { router.get('/preview', async (req, res, next) => { try { const { source, field, pattern, flags, function_type = 'extract', replace_value = '', limit = 20 } = req.query; - if (!source || !field || !pattern) { return res.status(400).json({ error: 'source, field, and pattern are required' }); } - - const query = function_type === 'replace' - ? `SELECT - id, - data->>$1 AS raw_value, - to_jsonb(regexp_replace(data->>$1, $2, $3, $4)) AS extracted_value - FROM records - WHERE source_name = $5 AND data ? $1 - ORDER BY id DESC LIMIT $6` - : `SELECT - r.id, - r.data->>$1 AS raw_value, - CASE - WHEN agg.match_count = 0 THEN NULL - WHEN agg.match_count = 1 THEN agg.matches->0 - ELSE agg.matches - END AS extracted_value - FROM records r - CROSS JOIN LATERAL ( - SELECT - jsonb_agg( - CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) - ELSE to_jsonb(mt) - END - ORDER BY rn - ) AS matches, - count(*)::int AS match_count - FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn) - ) agg - WHERE r.source_name = $4 AND r.data ? $1 - ORDER BY r.id DESC LIMIT $5`; - - const params = function_type === 'replace' - ? [field, pattern, replace_value, flags || '', source, parseInt(limit)] - : [field, pattern, flags || '', source, parseInt(limit)]; - - const result = await pool.query(query, params); + const result = await pool.query( + `SELECT * FROM preview_rule(${lit(source)}, ${lit(field)}, ${lit(pattern)}, ${lit(flags || '')}, ${lit(function_type)}, ${lit(replace_value)}, ${lit(parseInt(limit))})` + ); res.json(result.rows); } catch (err) { next(err); @@ -76,49 +39,11 @@ module.exports = (pool) => { router.get('/:id/test', async (req, res, next) => { try { const { limit = 20 } = req.query; - - const ruleResult = await pool.query( - 'SELECT * FROM rules WHERE id = $1', - [req.params.id] - ); - - if (ruleResult.rows.length === 0) { - return res.status(404).json({ error: 'Rule not found' }); - } - - const rule = ruleResult.rows[0]; - const result = await pool.query( - `SELECT - r.id, - r.data->>$1 AS raw_value, - CASE - WHEN agg.match_count = 0 THEN NULL - WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1 - THEN to_jsonb(agg.matches[1][1]) - WHEN agg.match_count = 1 - THEN to_jsonb(agg.matches[1]) - WHEN array_length(agg.matches[1], 1) = 1 - THEN (SELECT jsonb_agg(m[1] ORDER BY idx) - FROM unnest(agg.matches) WITH ORDINALITY u(m, idx)) - ELSE to_jsonb(agg.matches) - END AS extracted_value - FROM records r - CROSS JOIN LATERAL ( - SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count - FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn) - ) agg - WHERE r.source_name = $4 - AND r.data ? $1 - ORDER BY r.id DESC - LIMIT $5`, - [rule.field, rule.pattern, rule.flags || '', rule.source_name, parseInt(limit)] + `SELECT * FROM test_rule(${lit(parseInt(req.params.id))}, ${lit(parseInt(limit))})` ); - - res.json({ - rule: { id: rule.id, name: rule.name, field: rule.field, pattern: rule.pattern, output_field: rule.output_field }, - results: result.rows - }); + if (result.rows.length === 0) return res.status(404).json({ error: 'Rule not found' }); + res.json(result.rows[0]); } catch (err) { next(err); } @@ -127,15 +52,8 @@ module.exports = (pool) => { // Get single rule router.get('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM rules WHERE id = $1', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Rule not found' }); - } - + const result = await pool.query(`SELECT * FROM get_rule(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Rule not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -146,32 +64,19 @@ module.exports = (pool) => { router.post('/', async (req, res, next) => { try { const { source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body; - if (!source_name || !name || !field || !pattern || !output_field) { - return res.status(400).json({ - error: 'Missing required fields: source_name, name, field, pattern, output_field' - }); + return res.status(400).json({ error: 'Missing required fields: source_name, name, field, pattern, output_field' }); } - if (function_type && !['extract', 'replace'].includes(function_type)) { return res.status(400).json({ error: 'function_type must be "extract" or "replace"' }); } - const result = await pool.query( - `INSERT INTO rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING *`, - [source_name, name, field, pattern, output_field, function_type || 'extract', flags || '', replace_value || '', enabled !== false, retain === true, sequence || 0] + `SELECT * FROM create_rule(${lit(source_name)}, ${lit(name)}, ${lit(field)}, ${lit(pattern)}, ${lit(output_field)}, ${lit(function_type || 'extract')}, ${lit(flags || '')}, ${lit(replace_value || '')}, ${lit(enabled !== false)}, ${lit(retain === true)}, ${lit(sequence || 0)})` ); - res.status(201).json(result.rows[0]); } catch (err) { - if (err.code === '23505') { // Unique violation - return res.status(409).json({ error: 'Rule already exists for this source' }); - } - if (err.code === '23503') { // Foreign key violation - return res.status(404).json({ error: 'Source not found' }); - } + if (err.code === '23505') return res.status(409).json({ error: 'Rule already exists for this source' }); + if (err.code === '23503') return res.status(404).json({ error: 'Source not found' }); next(err); } }); @@ -180,32 +85,14 @@ module.exports = (pool) => { router.put('/:id', async (req, res, next) => { try { const { name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence } = req.body; - if (function_type && !['extract', 'replace'].includes(function_type)) { return res.status(400).json({ error: 'function_type must be "extract" or "replace"' }); } - + const n = (v) => v !== undefined ? lit(v) : 'NULL'; const result = await pool.query( - `UPDATE rules - SET name = COALESCE($2, name), - field = COALESCE($3, field), - pattern = COALESCE($4, pattern), - output_field = COALESCE($5, output_field), - function_type = COALESCE($6, function_type), - flags = COALESCE($7, flags), - replace_value = COALESCE($8, replace_value), - enabled = COALESCE($9, enabled), - retain = COALESCE($10, retain), - sequence = COALESCE($11, sequence) - WHERE id = $1 - RETURNING *`, - [req.params.id, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence] + `SELECT * FROM update_rule(${lit(parseInt(req.params.id))}, ${n(name)}, ${n(field)}, ${n(pattern)}, ${n(output_field)}, ${n(function_type)}, ${n(flags)}, ${n(replace_value)}, ${n(enabled)}, ${n(retain)}, ${n(sequence)})` ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Rule not found' }); - } - + if (result.rows.length === 0) return res.status(404).json({ error: 'Rule not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -215,15 +102,8 @@ module.exports = (pool) => { // Delete rule router.delete('/:id', async (req, res, next) => { try { - const result = await pool.query( - 'DELETE FROM rules WHERE id = $1 RETURNING id, name', - [req.params.id] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Rule not found' }); - } - + const result = await pool.query(`SELECT * FROM delete_rule(${lit(parseInt(req.params.id))})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Rule not found' }); res.json({ success: true, deleted: result.rows[0] }); } catch (err) { next(err); diff --git a/api/routes/sources.js b/api/routes/sources.js index 54aa430..261934d 100644 --- a/api/routes/sources.js +++ b/api/routes/sources.js @@ -6,6 +6,7 @@ const express = require('express'); const multer = require('multer'); const { parse } = require('csv-parse/sync'); +const { lit, arr } = require('../lib/sql'); const upload = multer({ storage: multer.memoryStorage() }); @@ -15,9 +16,7 @@ module.exports = (pool) => { // List all sources router.get('/', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM sources ORDER BY name' - ); + const result = await pool.query(`SELECT * FROM list_sources()`); res.json(result.rows); } catch (err) { next(err); @@ -27,15 +26,8 @@ module.exports = (pool) => { // Get single source router.get('/:name', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT * FROM sources WHERE name = $1', - [req.params.name] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Source not found' }); - } - + const result = await pool.query(`SELECT * FROM get_source(${lit(req.params.name)})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -45,39 +37,24 @@ module.exports = (pool) => { // Suggest source definition from CSV router.post('/suggest', upload.single('file'), async (req, res, next) => { try { - if (!req.file) { - return res.status(400).json({ error: 'No file uploaded' }); - } + if (!req.file) return res.status(400).json({ error: 'No file uploaded' }); - const records = parse(req.file.buffer, { - columns: true, - skip_empty_lines: true, - trim: true - }); - - if (records.length === 0) { - return res.status(400).json({ error: 'CSV file is empty' }); - } + const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true }); + if (records.length === 0) return res.status(400).json({ error: 'CSV file is empty' }); const sample = records[0]; const fields = Object.keys(sample).map(key => { const val = sample[key]; let type = 'text'; - if (!isNaN(parseFloat(val)) && isFinite(val) && val.charAt(0) !== '0') { type = 'numeric'; } else if (Date.parse(val) > Date.parse('1950-01-01') && Date.parse(val) < Date.parse('2050-01-01')) { type = 'date'; } - return { name: key, type }; }); - res.json({ - name: '', - dedup_fields: [], - fields - }); + res.json({ name: '', dedup_fields: [], fields }); } catch (err) { next(err); } @@ -87,25 +64,15 @@ module.exports = (pool) => { router.post('/', async (req, res, next) => { try { const { name, dedup_fields, config } = req.body; - if (!name || !dedup_fields || !Array.isArray(dedup_fields)) { - return res.status(400).json({ - error: 'Missing required fields: name, dedup_fields (array)' - }); + return res.status(400).json({ error: 'Missing required fields: name, dedup_fields (array)' }); } - const result = await pool.query( - `INSERT INTO sources (name, dedup_fields, config) - VALUES ($1, $2, $3) - RETURNING *`, - [name, dedup_fields, config || {}] + `SELECT * FROM create_source(${lit(name)}, ${arr(dedup_fields)}, ${lit(config || {})})` ); - res.status(201).json(result.rows[0]); } catch (err) { - if (err.code === '23505') { // Unique violation - return res.status(409).json({ error: 'Source already exists' }); - } + if (err.code === '23505') return res.status(409).json({ error: 'Source already exists' }); next(err); } }); @@ -114,21 +81,10 @@ module.exports = (pool) => { router.put('/:name', async (req, res, next) => { try { const { dedup_fields, config } = req.body; - const result = await pool.query( - `UPDATE sources - SET dedup_fields = COALESCE($2, dedup_fields), - config = COALESCE($3, config), - updated_at = CURRENT_TIMESTAMP - WHERE name = $1 - RETURNING *`, - [req.params.name, dedup_fields, config] + `SELECT * FROM update_source(${lit(req.params.name)}, ${dedup_fields ? arr(dedup_fields) : 'NULL'}, ${config ? lit(config) : 'NULL'})` ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Source not found' }); - } - + if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); res.json(result.rows[0]); } catch (err) { next(err); @@ -138,16 +94,9 @@ module.exports = (pool) => { // Delete source router.delete('/:name', async (req, res, next) => { try { - const result = await pool.query( - 'DELETE FROM sources WHERE name = $1 RETURNING name', - [req.params.name] - ); - - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Source not found' }); - } - - res.json({ success: true, deleted: result.rows[0].name }); + const result = await pool.query(`SELECT * FROM delete_source(${lit(req.params.name)})`); + if (result.rows.length === 0) return res.status(404).json({ error: 'Source not found' }); + res.json({ success: true, deleted: result.rows[0].delete_source }); } catch (err) { next(err); } @@ -156,23 +105,11 @@ module.exports = (pool) => { // Import CSV data router.post('/:name/import', upload.single('file'), async (req, res, next) => { try { - if (!req.file) { - return res.status(400).json({ error: 'No file uploaded' }); - } - - // Parse CSV - const records = parse(req.file.buffer, { - columns: true, - skip_empty_lines: true, - trim: true - }); - - // Import records + if (!req.file) return res.status(400).json({ error: 'No file uploaded' }); + const records = parse(req.file.buffer, { columns: true, skip_empty_lines: true, trim: true }); const result = await pool.query( - 'SELECT import_records($1, $2) as result', - [req.params.name, JSON.stringify(records)] + `SELECT import_records(${lit(req.params.name)}, ${lit(records)}) as result` ); - res.json(result.rows[0].result); } catch (err) { next(err); @@ -182,12 +119,7 @@ module.exports = (pool) => { // Get import log router.get('/:name/import-log', async (req, res, next) => { try { - const result = await pool.query( - `SELECT * FROM import_log - WHERE source_name = $1 - ORDER BY imported_at DESC`, - [req.params.name] - ); + const result = await pool.query(`SELECT * FROM get_import_log(${lit(req.params.name)})`); res.json(result.rows); } catch (err) { next(err); @@ -197,11 +129,7 @@ module.exports = (pool) => { // Apply transformations router.post('/:name/transform', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT apply_transformations($1) as result', - [req.params.name] - ); - + const result = await pool.query(`SELECT apply_transformations(${lit(req.params.name)}) as result`); res.json(result.rows[0].result); } catch (err) { next(err); @@ -211,25 +139,7 @@ module.exports = (pool) => { // Get all known field names for a source router.get('/:name/fields', async (req, res, next) => { try { - const result = await pool.query(` - SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins - FROM ( - SELECT f->>'name' AS key, 'schema' AS origin - FROM sources, jsonb_array_elements(config->'fields') f - WHERE name = $1 AND config ? 'fields' - UNION ALL - SELECT jsonb_object_keys(data) AS key, 'raw' AS origin - FROM records WHERE source_name = $1 - UNION ALL - SELECT output_field AS key, 'rule: ' || name AS origin - FROM rules WHERE source_name = $1 - UNION ALL - SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin - FROM mappings WHERE source_name = $1 - ) keys - GROUP BY key - ORDER BY key - `, [req.params.name]); + const result = await pool.query(`SELECT * FROM get_source_fields(${lit(req.params.name)})`); res.json(result.rows); } catch (err) { next(err); @@ -239,10 +149,7 @@ module.exports = (pool) => { // Generate output view router.post('/:name/view', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT generate_source_view($1) as result', - [req.params.name] - ); + const result = await pool.query(`SELECT generate_source_view(${lit(req.params.name)}) as result`); res.json(result.rows[0].result); } catch (err) { next(err); @@ -252,11 +159,7 @@ module.exports = (pool) => { // Reprocess all records router.post('/:name/reprocess', async (req, res, next) => { try { - const result = await pool.query( - 'SELECT reprocess_records($1) as result', - [req.params.name] - ); - + const result = await pool.query(`SELECT reprocess_records(${lit(req.params.name)}) as result`); res.json(result.rows[0].result); } catch (err) { next(err); @@ -266,59 +169,21 @@ module.exports = (pool) => { // Get statistics router.get('/:name/stats', async (req, res, next) => { try { - const result = await pool.query( - `SELECT - COUNT(*) as total_records, - COUNT(*) FILTER (WHERE transformed IS NOT NULL) as transformed_records, - COUNT(*) FILTER (WHERE transformed IS NULL) as pending_records - FROM records - WHERE source_name = $1`, - [req.params.name] - ); - + const result = await pool.query(`SELECT * FROM get_source_stats(${lit(req.params.name)})`); res.json(result.rows[0]); } catch (err) { next(err); } }); + // Get view data (paginated, sortable) router.get('/:name/view-data', async (req, res, next) => { try { const { limit = 100, offset = 0, sort_col, sort_dir } = req.query; - const viewName = `dfv.${req.params.name}`; - - // Check view exists - const check = await pool.query( - `SELECT 1 FROM information_schema.views - WHERE table_schema = 'dfv' AND table_name = $1`, - [req.params.name] - ); - - if (check.rows.length === 0) { - return res.json({ exists: false, rows: [] }); - } - - // Validate sort_col against actual view columns to prevent injection - let orderClause = ''; - if (sort_col) { - const cols = await pool.query( - `SELECT column_name FROM information_schema.columns - WHERE table_schema = 'dfv' AND table_name = $1`, - [req.params.name] - ); - const validCols = cols.rows.map(r => r.column_name); - if (validCols.includes(sort_col)) { - const dir = sort_dir === 'desc' ? 'DESC' : 'ASC'; - orderClause = ` ORDER BY "${sort_col}" ${dir} NULLS LAST`; - } - } - const result = await pool.query( - `SELECT * FROM ${viewName}${orderClause} LIMIT $1 OFFSET $2`, - [parseInt(limit), parseInt(offset)] + `SELECT get_view_data(${lit(req.params.name)}, ${lit(parseInt(limit))}, ${lit(parseInt(offset))}, ${lit(sort_col || null)}, ${lit(sort_dir || 'asc')}) as result` ); - - res.json({ exists: true, rows: result.rows }); + res.json(result.rows[0].result); } catch (err) { next(err); } diff --git a/database/queries/mappings.sql b/database/queries/mappings.sql new file mode 100644 index 0000000..e2e56e6 --- /dev/null +++ b/database/queries/mappings.sql @@ -0,0 +1,208 @@ +-- +-- Mappings queries +-- All SQL for api/routes/mappings.js +-- + +SET search_path TO dataflow, public; + +-- ── CRUD ───────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION list_mappings(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL) +RETURNS SETOF dataflow.mappings AS $$ + SELECT * FROM dataflow.mappings + WHERE source_name = p_source_name + AND (p_rule_name IS NULL OR rule_name = p_rule_name) + ORDER BY rule_name, input_value::text; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION get_mapping(p_id INT) +RETURNS dataflow.mappings AS $$ + SELECT * FROM dataflow.mappings WHERE id = p_id; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION create_mapping( + p_source_name TEXT, + p_rule_name TEXT, + p_input_value JSONB, + p_output JSONB +) +RETURNS dataflow.mappings AS $$ + INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) + VALUES (p_source_name, p_rule_name, p_input_value, p_output) + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION upsert_mapping( + p_source_name TEXT, + p_rule_name TEXT, + p_input_value JSONB, + p_output JSONB +) +RETURNS dataflow.mappings AS $$ + INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) + VALUES (p_source_name, p_rule_name, p_input_value, p_output) + ON CONFLICT (source_name, rule_name, input_value) + DO UPDATE SET output = EXCLUDED.output + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION update_mapping( + p_id INT, + p_input_value JSONB DEFAULT NULL, + p_output JSONB DEFAULT NULL +) +RETURNS dataflow.mappings AS $$ + UPDATE dataflow.mappings SET + input_value = COALESCE(p_input_value, input_value), + output = COALESCE(p_output, output) + WHERE id = p_id + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION delete_mapping(p_id INT) +RETURNS TABLE (id INT) AS $$ + DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id; +$$ LANGUAGE sql; + +-- ── Counts ──────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION get_mapping_counts(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL) +RETURNS TABLE (rule_name TEXT, input_value JSONB, record_count BIGINT) AS $$ + SELECT + m.rule_name, + m.input_value, + COUNT(rec.id) AS record_count + FROM dataflow.mappings m + JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name + LEFT JOIN dataflow.records rec ON + rec.source_name = m.source_name + AND rec.transformed ? r.output_field + AND rec.transformed -> r.output_field = m.input_value + WHERE m.source_name = p_source_name + AND (p_rule_name IS NULL OR m.rule_name = p_rule_name) + GROUP BY m.rule_name, m.input_value; +$$ LANGUAGE sql STABLE; + +-- ── All values (mapped + unmapped) ─────────────────────────────────────────── + +DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT); +CREATE FUNCTION get_all_values( + p_source_name TEXT, + p_rule_name TEXT DEFAULT NULL +) RETURNS TABLE ( + rule_name TEXT, + output_field TEXT, + source_field TEXT, + extracted_value JSONB, + record_count BIGINT, + sample JSONB, + mapping_id INTEGER, + output JSONB, + is_mapped BOOLEAN +) AS $$ +BEGIN + RETURN QUERY + WITH extracted AS ( + SELECT + r.name AS rule_name, + r.output_field, + r.field AS source_field, + rec.transformed -> r.output_field AS extracted_value, + rec.data AS record_data, + row_number() OVER ( + PARTITION BY r.name, rec.transformed -> r.output_field + ORDER BY rec.id + ) AS rn + FROM dataflow.records rec + CROSS JOIN dataflow.rules r + WHERE rec.source_name = p_source_name + AND r.source_name = p_source_name + AND rec.transformed IS NOT NULL + AND rec.transformed ? r.output_field + AND (p_rule_name IS NULL OR r.name = p_rule_name) + AND rec.data ? r.field + ), + aggregated AS ( + SELECT + e.rule_name, + e.output_field, + e.source_field, + e.extracted_value, + count(*) AS record_count, + jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample + FROM extracted e + GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value + ) + SELECT + a.rule_name, + a.output_field, + a.source_field, + a.extracted_value, + a.record_count, + a.sample, + m.id AS mapping_id, + m.output, + (m.id IS NOT NULL) AS is_mapped + FROM aggregated a + LEFT JOIN dataflow.mappings m ON + m.source_name = p_source_name + AND m.rule_name = a.rule_name + AND m.input_value = a.extracted_value + ORDER BY a.record_count DESC; +END; +$$ LANGUAGE plpgsql; + +-- ── Unmapped values ─────────────────────────────────────────────────────────── + +DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT); +CREATE FUNCTION get_unmapped_values( + p_source_name TEXT, + p_rule_name TEXT DEFAULT NULL +) RETURNS TABLE ( + rule_name TEXT, + output_field TEXT, + source_field TEXT, + extracted_value JSONB, + record_count BIGINT, + sample JSONB +) AS $$ +BEGIN + RETURN QUERY + WITH extracted AS ( + SELECT + r.name AS rule_name, + r.output_field, + r.field AS source_field, + rec.transformed -> r.output_field AS extracted_value, + rec.data AS record_data, + row_number() OVER ( + PARTITION BY r.name, rec.transformed -> r.output_field + ORDER BY rec.id + ) AS rn + FROM dataflow.records rec + CROSS JOIN dataflow.rules r + WHERE rec.source_name = p_source_name + AND r.source_name = p_source_name + AND rec.transformed IS NOT NULL + AND rec.transformed ? r.output_field + AND (p_rule_name IS NULL OR r.name = p_rule_name) + AND rec.data ? r.field + ) + SELECT + e.rule_name, + e.output_field, + e.source_field, + e.extracted_value, + count(*) AS record_count, + jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample + FROM extracted e + WHERE NOT EXISTS ( + SELECT 1 FROM dataflow.mappings m + WHERE m.source_name = p_source_name + AND m.rule_name = e.rule_name + AND m.input_value = e.extracted_value + ) + GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value + ORDER BY count(*) DESC; +END; +$$ LANGUAGE plpgsql; diff --git a/database/queries/records.sql b/database/queries/records.sql new file mode 100644 index 0000000..ea9b2af --- /dev/null +++ b/database/queries/records.sql @@ -0,0 +1,55 @@ +-- +-- Records queries +-- All SQL for api/routes/records.js +-- + +SET search_path TO dataflow, public; + +-- ── Read ────────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION list_records( + p_source_name TEXT, + p_limit INT DEFAULT 100, + p_offset INT DEFAULT 0, + p_transformed_only BOOLEAN DEFAULT FALSE +) +RETURNS SETOF dataflow.records AS $$ + SELECT * FROM dataflow.records + WHERE source_name = p_source_name + AND (NOT p_transformed_only OR transformed IS NOT NULL) + ORDER BY id DESC + LIMIT p_limit OFFSET p_offset; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION get_record(p_id BIGINT) +RETURNS dataflow.records AS $$ + SELECT * FROM dataflow.records WHERE id = p_id; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION search_records( + p_source_name TEXT, + p_query JSONB, + p_limit INT DEFAULT 100 +) +RETURNS SETOF dataflow.records AS $$ + SELECT * FROM dataflow.records + WHERE source_name = p_source_name + AND (data @> p_query OR transformed @> p_query) + ORDER BY id DESC + LIMIT p_limit; +$$ LANGUAGE sql STABLE; + +-- ── Delete ──────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION delete_record(p_id BIGINT) +RETURNS TABLE (id BIGINT) AS $$ + DELETE FROM dataflow.records WHERE id = p_id RETURNING id; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION delete_source_records(p_source_name TEXT) +RETURNS TABLE (deleted_count BIGINT) AS $$ + WITH deleted AS ( + DELETE FROM dataflow.records WHERE source_name = p_source_name RETURNING id + ) + SELECT count(*) AS deleted_count FROM deleted; +$$ LANGUAGE sql; diff --git a/database/queries/rules.sql b/database/queries/rules.sql new file mode 100644 index 0000000..c6d8aea --- /dev/null +++ b/database/queries/rules.sql @@ -0,0 +1,169 @@ +-- +-- Rules queries +-- All SQL for api/routes/rules.js +-- + +SET search_path TO dataflow, public; + +-- ── CRUD ───────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION list_rules(p_source_name TEXT) +RETURNS SETOF dataflow.rules AS $$ + SELECT * FROM dataflow.rules + WHERE source_name = p_source_name + ORDER BY sequence, name; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION get_rule(p_id INT) +RETURNS dataflow.rules AS $$ + SELECT * FROM dataflow.rules WHERE id = p_id; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION create_rule( + p_source_name TEXT, + p_name TEXT, + p_field TEXT, + p_pattern TEXT, + p_output_field TEXT, + p_function_type TEXT DEFAULT 'extract', + p_flags TEXT DEFAULT '', + p_replace_value TEXT DEFAULT '', + p_enabled BOOLEAN DEFAULT TRUE, + p_retain BOOLEAN DEFAULT FALSE, + p_sequence INT DEFAULT 0 +) +RETURNS dataflow.rules AS $$ + INSERT INTO dataflow.rules + (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence) + VALUES + (p_source_name, p_name, p_field, p_pattern, p_output_field, p_function_type, p_flags, p_replace_value, p_enabled, p_retain, p_sequence) + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION update_rule( + p_id INT, + p_name TEXT DEFAULT NULL, + p_field TEXT DEFAULT NULL, + p_pattern TEXT DEFAULT NULL, + p_output_field TEXT DEFAULT NULL, + p_function_type TEXT DEFAULT NULL, + p_flags TEXT DEFAULT NULL, + p_replace_value TEXT DEFAULT NULL, + p_enabled BOOLEAN DEFAULT NULL, + p_retain BOOLEAN DEFAULT NULL, + p_sequence INT DEFAULT NULL +) +RETURNS dataflow.rules AS $$ + UPDATE dataflow.rules SET + name = COALESCE(p_name, name), + field = COALESCE(p_field, field), + pattern = COALESCE(p_pattern, pattern), + output_field = COALESCE(p_output_field, output_field), + function_type = COALESCE(p_function_type, function_type), + flags = COALESCE(p_flags, flags), + replace_value = COALESCE(p_replace_value, replace_value), + enabled = COALESCE(p_enabled, enabled), + retain = COALESCE(p_retain, retain), + sequence = COALESCE(p_sequence, sequence) + WHERE id = p_id + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION delete_rule(p_id INT) +RETURNS TABLE (id INT, name TEXT) AS $$ + DELETE FROM dataflow.rules WHERE id = p_id RETURNING id, name; +$$ LANGUAGE sql; + +-- ── Preview (ad-hoc pattern, no saved rule) ─────────────────────────────────── + +CREATE OR REPLACE FUNCTION preview_rule( + p_source TEXT, + p_field TEXT, + p_pattern TEXT, + p_flags TEXT DEFAULT '', + p_function_type TEXT DEFAULT 'extract', + p_replace_value TEXT DEFAULT '', + p_limit INT DEFAULT 20 +) +RETURNS TABLE (id BIGINT, raw_value TEXT, extracted_value JSONB) AS $$ +BEGIN + IF p_function_type = 'replace' THEN + RETURN QUERY + SELECT + r.id, + r.data ->> p_field, + to_jsonb(regexp_replace(r.data ->> p_field, p_pattern, p_replace_value, p_flags)) + FROM dataflow.records r + WHERE source_name = p_source AND data ? p_field + ORDER BY r.id DESC LIMIT p_limit; + ELSE + RETURN QUERY + SELECT + r.id, + r.data ->> p_field, + CASE + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 THEN agg.matches -> 0 + ELSE agg.matches + END + FROM dataflow.records r + CROSS JOIN LATERAL ( + SELECT + jsonb_agg( + CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END + ORDER BY rn + ) AS matches, + count(*)::int AS match_count + FROM regexp_matches(r.data ->> p_field, p_pattern, p_flags) + WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = p_source AND r.data ? p_field + ORDER BY r.id DESC LIMIT p_limit; + END IF; +END; +$$ LANGUAGE plpgsql STABLE; + +-- ── Test (saved rule against real records) ──────────────────────────────────── + +CREATE OR REPLACE FUNCTION test_rule(p_rule_id INT, p_limit INT DEFAULT 20) +RETURNS TABLE (rule JSONB, results JSONB) AS $$ +DECLARE + v_rule dataflow.rules%ROWTYPE; + v_results JSONB; +BEGIN + SELECT * INTO v_rule FROM dataflow.rules WHERE id = p_rule_id; + IF NOT FOUND THEN RETURN; END IF; + + SELECT jsonb_agg(row_to_json(t)) INTO v_results FROM ( + SELECT + r.id, + r.data ->> v_rule.field AS raw_value, + CASE + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1 + THEN to_jsonb(agg.matches[1][1]) + WHEN agg.match_count = 1 + THEN to_jsonb(agg.matches[1]) + WHEN array_length(agg.matches[1], 1) = 1 + THEN (SELECT jsonb_agg(m[1] ORDER BY idx) FROM unnest(agg.matches) WITH ORDINALITY u(m, idx)) + ELSE to_jsonb(agg.matches) + END AS extracted_value + FROM dataflow.records r + CROSS JOIN LATERAL ( + SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count + FROM regexp_matches(r.data ->> v_rule.field, v_rule.pattern, COALESCE(v_rule.flags, '')) + WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = v_rule.source_name AND r.data ? v_rule.field + ORDER BY r.id DESC LIMIT p_limit + ) t; + + RETURN QUERY SELECT + jsonb_build_object( + 'id', v_rule.id, 'name', v_rule.name, + 'field', v_rule.field, 'pattern', v_rule.pattern, + 'output_field', v_rule.output_field + ), + COALESCE(v_results, '[]'::jsonb); +END; +$$ LANGUAGE plpgsql STABLE; diff --git a/database/queries/sources.sql b/database/queries/sources.sql new file mode 100644 index 0000000..ac78d04 --- /dev/null +++ b/database/queries/sources.sql @@ -0,0 +1,322 @@ +-- +-- Sources queries +-- All SQL for api/routes/sources.js +-- + +SET search_path TO dataflow, public; + +-- ── CRUD ───────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION list_sources() +RETURNS SETOF dataflow.sources AS $$ + SELECT * FROM dataflow.sources ORDER BY name; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION get_source(p_name TEXT) +RETURNS dataflow.sources AS $$ + SELECT * FROM dataflow.sources WHERE name = p_name; +$$ LANGUAGE sql STABLE; + +CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_dedup_fields TEXT[], p_config JSONB DEFAULT '{}') +RETURNS dataflow.sources AS $$ + INSERT INTO dataflow.sources (name, dedup_fields, config) + VALUES (p_name, p_dedup_fields, p_config) + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_dedup_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL) +RETURNS dataflow.sources AS $$ + UPDATE dataflow.sources + SET dedup_fields = COALESCE(p_dedup_fields, dedup_fields), + config = COALESCE(p_config, config), + updated_at = CURRENT_TIMESTAMP + WHERE name = p_name + RETURNING *; +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION delete_source(p_name TEXT) +RETURNS TEXT AS $$ + DELETE FROM dataflow.sources WHERE name = p_name RETURNING name; +$$ LANGUAGE sql; + +-- ── Import log ──────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT) +RETURNS SETOF dataflow.import_log AS $$ + SELECT * FROM dataflow.import_log + WHERE source_name = p_source_name + ORDER BY imported_at DESC; +$$ LANGUAGE sql STABLE; + +-- ── Stats ───────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION get_source_stats(p_source_name TEXT) +RETURNS TABLE (total_records BIGINT, transformed_records BIGINT, pending_records BIGINT) AS $$ + SELECT + COUNT(*) AS total_records, + COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records, + COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records + FROM dataflow.records + WHERE source_name = p_source_name; +$$ LANGUAGE sql STABLE; + +-- ── Fields ──────────────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION get_source_fields(p_source_name TEXT) +RETURNS TABLE (key TEXT, origins TEXT[]) AS $$ + SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins + FROM ( + SELECT f->>'name' AS key, 'schema' AS origin + FROM dataflow.sources, jsonb_array_elements(config->'fields') f + WHERE name = p_source_name AND config ? 'fields' + UNION ALL + SELECT jsonb_object_keys(data) AS key, 'raw' AS origin + FROM dataflow.records WHERE source_name = p_source_name + UNION ALL + SELECT output_field AS key, 'rule: ' || name AS origin + FROM dataflow.rules WHERE source_name = p_source_name + UNION ALL + SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin + FROM dataflow.mappings WHERE source_name = p_source_name + ) keys + GROUP BY key + ORDER BY key; +$$ LANGUAGE sql STABLE; + +-- ── View data (dynamic sort via EXECUTE) ────────────────────────────────────── + +CREATE OR REPLACE FUNCTION get_view_data( + p_source_name TEXT, + p_limit INT DEFAULT 100, + p_offset INT DEFAULT 0, + p_sort_col TEXT DEFAULT NULL, + p_sort_dir TEXT DEFAULT 'asc' +) +RETURNS JSON AS $$ +DECLARE + v_exists BOOLEAN; + v_order TEXT := ''; + v_rows JSON; +BEGIN + SELECT EXISTS ( + SELECT 1 FROM information_schema.views + WHERE table_schema = 'dfv' AND table_name = p_source_name + ) INTO v_exists; + + IF NOT v_exists THEN + RETURN json_build_object('exists', FALSE, 'rows', '[]'::json); + END IF; + + IF p_sort_col IS NOT NULL AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'dfv' + AND table_name = p_source_name + AND column_name = p_sort_col + ) THEN + v_order := ' ORDER BY ' || quote_ident(p_sort_col) + || CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END + || ' NULLS LAST'; + END IF; + + -- Subquery applies ORDER BY + LIMIT first, then json_agg collects in that order. + -- json_agg on the outer query preserves column order (json not jsonb). + EXECUTE format( + 'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s LIMIT %s OFFSET %s) t', + p_source_name, v_order, p_limit, p_offset + ) INTO v_rows; + + RETURN json_build_object('exists', TRUE, 'rows', v_rows); +END; +$$ LANGUAGE plpgsql STABLE; + +-- ── Import (deduplication) ──────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB) +RETURNS JSON AS $$ +DECLARE + v_dedup_fields TEXT[]; + v_record JSONB; + v_dedup_key TEXT; + v_inserted INTEGER := 0; + v_duplicates INTEGER := 0; + v_log_id INTEGER; +BEGIN + SELECT dedup_fields INTO v_dedup_fields + FROM dataflow.sources WHERE name = p_source_name; + + IF v_dedup_fields IS NULL THEN + RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name); + END IF; + + FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP + v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); + BEGIN + INSERT INTO dataflow.records (source_name, data, dedup_key) + VALUES (p_source_name, v_record, v_dedup_key); + v_inserted := v_inserted + 1; + EXCEPTION WHEN unique_violation THEN + v_duplicates := v_duplicates + 1; + END; + END LOOP; + + INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) + VALUES (p_source_name, v_inserted, v_duplicates) + RETURNING id INTO v_log_id; + + RETURN json_build_object('success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id); +END; +$$ LANGUAGE plpgsql; + +-- ── Transformations ─────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) +RETURNS JSONB AS $$ + SELECT COALESCE(a, '{}') || COALESCE(b, '{}') +$$ LANGUAGE sql IMMUTABLE; + +DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); +CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( + sfunc = dataflow.jsonb_merge, + stype = JSONB, + initcond = '{}' +); + +DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]); +CREATE OR REPLACE FUNCTION apply_transformations( + p_source_name TEXT, + p_record_ids INTEGER[] DEFAULT NULL, + p_overwrite BOOLEAN DEFAULT FALSE +) RETURNS JSON AS $$ +WITH +qualifying AS ( + SELECT id, data FROM dataflow.records + WHERE source_name = p_source_name + AND (p_overwrite OR transformed IS NULL) + AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) +), +rx AS ( + SELECT + q.id, + r.name AS rule_name, + r.sequence, + r.output_field, + r.retain, + r.function_type, + COALESCE(mt.rn, rp.rn, 1) AS result_number, + CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val, + to_jsonb(rp.rp) AS replace_val + FROM dataflow.rules r + INNER JOIN qualifying q ON q.data ? r.field + LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags) + WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract' + LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags) + WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace' + WHERE r.source_name = p_source_name AND r.enabled = true +), +agg_matches AS ( + SELECT + id, rule_name, sequence, output_field, retain, function_type, + CASE function_type + WHEN 'replace' THEN jsonb_agg(replace_val) -> 0 + ELSE + CASE WHEN max(result_number) = 1 + THEN jsonb_agg(match_val ORDER BY result_number) -> 0 + ELSE jsonb_agg(match_val ORDER BY result_number) + END + END AS extracted + FROM rx + GROUP BY id, rule_name, sequence, output_field, retain, function_type +), +linked AS ( + SELECT + a.id, a.sequence, a.output_field, a.retain, a.extracted, m.output AS mapped + FROM agg_matches a + LEFT JOIN dataflow.mappings m ON + m.source_name = p_source_name + AND m.rule_name = a.rule_name + AND m.input_value = a.extracted + WHERE a.extracted IS NOT NULL +), +rule_output AS ( + SELECT + id, sequence, + CASE + WHEN mapped IS NOT NULL THEN + mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END + ELSE jsonb_build_object(output_field, extracted) + END AS output + FROM linked +), +record_additions AS ( + SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions + FROM rule_output GROUP BY id +), +updated AS ( + UPDATE dataflow.records rec + SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb), + transformed_at = CURRENT_TIMESTAMP + FROM qualifying q + LEFT JOIN record_additions ra ON ra.id = q.id + WHERE rec.id = q.id + RETURNING rec.id +) +SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated +$$ LANGUAGE sql; + +CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) +RETURNS JSON AS $$ + SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE) +$$ LANGUAGE sql; + +-- ── View generation ─────────────────────────────────────────────────────────── + +CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT) +RETURNS JSON AS $$ +DECLARE + v_config JSONB; + v_field JSONB; + v_cols TEXT := ''; + v_sql TEXT; + v_view TEXT; +BEGIN + SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name; + + IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN + RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source'); + END IF; + + FOR v_field IN SELECT * FROM jsonb_array_elements(v_config->'fields') LOOP + IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; + + IF v_field->>'expression' IS NOT NULL THEN + DECLARE + v_expr TEXT := v_field->>'expression'; + v_ref TEXT; + BEGIN + WHILE v_expr ~ '\{[^}]+\}' LOOP + v_ref := substring(v_expr FROM '\{([^}]+)\}'); + v_expr := replace(v_expr, '{' || v_ref || '}', format('(transformed->>%L)::numeric', v_ref)); + END LOOP; + v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); + END; + ELSE + CASE v_field->>'type' + WHEN 'date' THEN v_cols := v_cols || format('(transformed->>%L)::date AS %I', v_field->>'name', v_field->>'name'); + WHEN 'numeric' THEN v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', v_field->>'name', v_field->>'name'); + ELSE v_cols := v_cols || format('transformed->>%L AS %I', v_field->>'name', v_field->>'name'); + END CASE; + END IF; + END LOOP; + + CREATE SCHEMA IF NOT EXISTS dfv; + v_view := 'dfv.' || quote_ident(p_source_name); + EXECUTE format('DROP VIEW IF EXISTS %s', v_view); + v_sql := format( + 'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', + v_view, v_cols, p_source_name + ); + EXECUTE v_sql; + + RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql); +END; +$$ LANGUAGE plpgsql; diff --git a/manage.py b/manage.py index c4cd77e..9eee6dc 100755 --- a/manage.py +++ b/manage.py @@ -331,8 +331,14 @@ def action_configure(cfg): ok(f'Settings written to {ENV_FILE}') db_location = f'database "{new_cfg["DB_NAME"]}" on {new_cfg["DB_HOST"]}:{new_cfg["DB_PORT"]}' - schema_file = ROOT / 'database' / 'schema.sql' - functions_file = ROOT / 'database' / 'functions.sql' + schema_file = ROOT / 'database' / 'schema.sql' + queries_dir = ROOT / 'database' / 'queries' + query_files = [ + queries_dir / 'sources.sql', + queries_dir / 'rules.sql', + queries_dir / 'mappings.sql', + queries_dir / 'records.sql', + ] # Offer schema deployment print() @@ -360,14 +366,21 @@ def action_configure(cfg): # Offer function deployment print() - show_commands([['psql', '-U', new_cfg['DB_USER'], '-h', new_cfg['DB_HOST'], '-p', new_cfg['DB_PORT'], '-d', new_cfg['DB_NAME'], '-f', str(functions_file)]]) + show_commands([ + ['psql', '-U', new_cfg['DB_USER'], '-h', new_cfg['DB_HOST'], '-p', new_cfg['DB_PORT'], '-d', new_cfg['DB_NAME'], '-f', str(f)] + for f in query_files + ]) if confirm(f'Deploy SQL functions into {db_location}?', default_yes=False): - print(f' Running {functions_file} against {db_location}...') - r = psql_file(new_cfg, functions_file) - if r.returncode == 0: - ok(f'SQL functions deployed into {db_location}') + for f in query_files: + print(f' Running {f.name} against {db_location}...') + r = psql_file(new_cfg, f) + if r.returncode == 0: + ok(f'{f.name} deployed') + else: + err(f'{f.name} failed:\n{r.stderr}') + break else: - err(f'Function deployment failed:\n{r.stderr}') + ok(f'All SQL functions deployed into {db_location}') return new_cfg @@ -414,15 +427,23 @@ def action_deploy_schema(cfg): def action_deploy_functions(cfg): - header('Deploy SQL functions (database/functions.sql)') + header('Deploy SQL functions (database/queries/)') if not cfg: err(f'{ENV_FILE} not found — run option 1 to configure the database connection first') return db_location = f'database "{cfg["DB_NAME"]}" on {cfg["DB_HOST"]}:{cfg["DB_PORT"]}' - functions_file = ROOT / 'database' / 'functions.sql' + queries_dir = ROOT / 'database' / 'queries' + query_files = [ + queries_dir / 'sources.sql', + queries_dir / 'rules.sql', + queries_dir / 'mappings.sql', + queries_dir / 'records.sql', + ] - print(f' Source file : {functions_file}') + print(f' Source files: {queries_dir}/') + for f in query_files: + print(f' {f.name}') print(f' Target : "dataflow" schema in {db_location}') print() @@ -436,17 +457,23 @@ def action_deploy_functions(cfg): info('Cancelled — no changes made') return - show_commands([['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], '-p', cfg['DB_PORT'], '-d', cfg['DB_NAME'], '-q', '-f', str(functions_file)]]) + show_commands([ + ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], '-p', cfg['DB_PORT'], '-d', cfg['DB_NAME'], '-q', '-f', str(f)] + for f in query_files + ]) if not confirm(f'Deploy SQL functions into {db_location}?', default_yes=False): info('Cancelled — no changes made') return - print(f' Running {functions_file} against {db_location}...') - r = psql_file(cfg, functions_file) - if r.returncode == 0: - ok(f'SQL functions deployed into {db_location}') - else: - err(f'Function deployment failed:\n{r.stderr}') + for f in query_files: + print(f' Running {f.name} against {db_location}...') + r = psql_file(cfg, f) + if r.returncode == 0: + ok(f'{f.name} deployed') + else: + err(f'{f.name} failed:\n{r.stderr}') + return + ok(f'All SQL functions deployed into {db_location}') def action_build_ui(): @@ -758,7 +785,7 @@ def action_set_login_credentials(cfg): MENU = [ ('Database configuration and deployment dialog (.env)', action_configure), ('Redeploy "dataflow" schema only (database/schema.sql)', action_deploy_schema), - ('Redeploy SQL functions only (database/functions.sql)', action_deploy_functions), + ('Redeploy SQL functions only (database/queries/)', action_deploy_functions), ('Build UI (ui/ → public/)', action_build_ui), ('Set up nginx reverse proxy', action_setup_nginx), ('Install dataflow systemd service unit', action_install_service),