dataflow/SPEC.md
Paul Trowbridge 291c665ed1 Consolidate all SQL into database/queries/, switch to literal SQL in routes
- Add database/queries/{sources,rules,mappings,records}.sql — one file per
  route, all business logic in PostgreSQL functions
- Replace parameterized queries in all four route files with lit()/jsonLit()
  literal interpolation for debuggability
- Add api/lib/sql.js with lit(), jsonLit(), arr() helpers
- Fix get_view_data to use json_agg (preserves column order) with subquery
  (guarantees sort order is respected before aggregation)
- Fix jsonLit() for JSONB params so plain strings become valid JSON
- Update manage.py option 3 to deploy database/queries/ instead of functions.sql
- Add SPEC.md covering architecture, philosophy, and manage.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 22:36:53 -04:00

308 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Dataflow — Application Specification
## Purpose
Dataflow is an internal tool for importing, cleaning, and standardizing data from multiple sources (e.g., bank transaction exports). It is intentionally simple — no queues, no schedulers, no plugins. Everything is triggered explicitly by the user through the UI or API.
---
## Philosophy and Design Choices
### Simple over clever
Every decision favors clarity. No abstractions for their own sake, no frameworks beyond what is necessary. If a piece of code is hard to follow, that is a sign it should be rewritten, not abstracted.
### All logic lives in the database
SQL functions are the single source of truth for business logic. The API layer is a thin HTTP wrapper — it validates input, calls a function, and returns the result. It does not construct business logic in JavaScript.
### SQL is written as full literal strings
Database calls in the route files use fully formed SQL strings with values interpolated directly (not parameterized). This makes every query copy-pasteable into psql for debugging. A small `lit()` helper in `api/lib/sql.js` handles quoting and escaping. This is an intentional trade-off: the tool is internal, and debuggability is worth more than the marginal injection protection parameterization provides over what `lit()` already does.
### One SQL file per route
SQL is organized in `database/queries/` with one file per route (`sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`). This makes it easy to find the SQL behind any API endpoint — look at the route file to find the function name, then look at the matching query file for the implementation.
### Explicit over implicit
Nothing happens automatically. Transformations are triggered by the user. Views are generated on demand. There are no database triggers, no background workers, no scheduled jobs.
### JSONB for flexibility
Raw imported records and transformed records are stored as JSONB. This avoids schema migrations when source formats change and allows different sources to have different field sets in the same table.
---
## Architecture
```
manage.py — interactive CLI for setup, deployment, and management
database/
schema.sql — table definitions (run once or to reset)
queries/
sources.sql — all SQL for /api/sources
rules.sql — all SQL for /api/rules
mappings.sql — all SQL for /api/mappings
records.sql — all SQL for /api/records
api/
server.js — Express server, mounts routes, auth middleware
middleware/
auth.js — Basic Auth enforcement on all /api routes
lib/
sql.js — lit() and arr() helpers for SQL literal building
routes/
sources.js — HTTP handlers for source management
rules.js — HTTP handlers for rule management
mappings.js — HTTP handlers for mapping management
records.js — HTTP handlers for record queries
ui/
src/
api.js — fetch wrapper, credential management
App.jsx — root: login gate, sidebar, source selector, routing
pages/
Login.jsx — username/password form
Sources.jsx — source CRUD, field config, view generation
Import.jsx — CSV upload and import log
Rules.jsx — rule CRUD with live pattern preview
Mappings.jsx — mapping table with TSV import/export
Records.jsx — paginated, sortable view of transformed records
public/ — compiled UI (output of npm run build in ui/)
```
---
## Database Schema
Five tables in the `dataflow` schema:
### `sources`
Defines a data source. The `dedup_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view.
### `records`
Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `dedup_key` is an MD5 hash of the dedup fields — a unique constraint on `(source_name, dedup_key)` prevents duplicate imports.
### `rules`
Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it.
### `mappings`
Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`).
### `import_log`
Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates.
---
## Data Flow
### Import
```
CSV file → parse in Node.js → import_records(source, data)
→ generate_dedup_key() per record → INSERT with unique constraint
→ count inserted vs duplicates → log to import_log
```
### Transform
```
apply_transformations(source) — pure SQL CTE
qualifying: records not yet transformed (or all, if overwrite=true)
rx: fan out one row per regex match (LATERAL join, rules × records)
agg_matches: collapse matches back to one value per (record, rule)
linked: LEFT JOIN mappings to find mapped output
rule_output: build per-rule output JSONB
record_additions: merge all rule outputs per record in sequence order
UPDATE records SET transformed = data || additions
```
The transform is fully set-based — no row-by-row loops. All records for a source are processed in a single query.
### View generation
`generate_source_view(source)` reads `config.fields` from the source and builds a `CREATE VIEW` statement in the `dfv` schema. Each field is cast to its declared type (`text`, `numeric`, `date`). Fields with an `expression` are computed from other transformed fields using `{field}` substitution syntax.
---
## SQL Functions
Each file in `database/queries/` maps 1-to-1 with a route file.
**sources.sql**
`list_sources`, `get_source`, `create_source`, `update_source`, `delete_source`, `get_import_log`, `get_source_stats`, `get_source_fields`, `get_view_data` (plpgsql — dynamic sort via EXECUTE + quote_ident), `import_records`, `jsonb_merge` + `jsonb_concat_obj` aggregate, `apply_transformations`, `reprocess_records`, `generate_source_view`
**rules.sql**
`list_rules`, `get_rule`, `create_rule`, `update_rule`, `delete_rule`, `preview_rule` (plpgsql — conditional query for extract vs replace), `test_rule` (returns `TABLE(rule JSONB, results JSONB)`)
**mappings.sql**
`list_mappings`, `get_mapping`, `create_mapping`, `upsert_mapping`, `update_mapping`, `delete_mapping`, `get_mapping_counts`, `get_all_values` (plpgsql — extracted values with mapping join), `get_unmapped_values` (plpgsql — extracted values with no mapping)
**records.sql**
`list_records`, `get_record`, `search_records` (JSONB containment on data and transformed), `delete_record`, `delete_source_records`
---
## API
All routes are under `/api`. Every route requires HTTP Basic Auth. The `GET /health` endpoint is exempt.
**Authentication:** `Authorization: Basic <base64(user:pass)>` on every request. Credentials are verified against `LOGIN_USER` (plaintext username) and `LOGIN_PASSWORD_HASH` (bcrypt hash) in `.env`. There are no sessions or tokens — credentials are sent with every request.
**Route summary:**
| Method | Path | Description |
|--------|------|-------------|
| GET | /api/sources | List all sources |
| POST | /api/sources | Create source |
| GET | /api/sources/:name | Get source |
| PUT | /api/sources/:name | Update source (dedup_fields, config) |
| DELETE | /api/sources/:name | Delete source and all data |
| POST | /api/sources/suggest | Suggest source config from CSV upload |
| POST | /api/sources/:name/import | Import CSV records |
| GET | /api/sources/:name/import-log | Import history |
| GET | /api/sources/:name/stats | Record counts |
| GET | /api/sources/:name/fields | All known field names and origins |
| GET | /api/sources/:name/view-data | Paginated, sortable view data |
| POST | /api/sources/:name/transform | Apply transformations (new records only) |
| POST | /api/sources/:name/reprocess | Reapply transformations to all records |
| POST | /api/sources/:name/view | Generate dfv view |
| GET | /api/rules/source/:name | List rules for a source |
| GET | /api/rules/preview | Preview pattern against live records |
| GET | /api/rules/:id/test | Test saved rule against live records |
| POST | /api/rules | Create rule |
| PUT | /api/rules/:id | Update rule |
| DELETE | /api/rules/:id | Delete rule |
| GET | /api/mappings/source/:name | List mappings |
| GET | /api/mappings/source/:name/all-values | All extracted values (mapped + unmapped) |
| GET | /api/mappings/source/:name/unmapped | Only unmapped extracted values |
| GET | /api/mappings/source/:name/counts | Record counts per mapping |
| GET | /api/mappings/source/:name/export.tsv | Export mappings as TSV |
| POST | /api/mappings/source/:name/import-csv | Import/update mappings from TSV |
| POST | /api/mappings | Create mapping |
| POST | /api/mappings/bulk | Upsert multiple mappings |
| PUT | /api/mappings/:id | Update mapping |
| DELETE | /api/mappings/:id | Delete mapping |
| GET | /api/records/source/:name | List raw records |
| GET | /api/records/:id | Get single record |
| POST | /api/records/search | Search by JSONB containment |
| DELETE | /api/records/:id | Delete record |
| DELETE | /api/records/source/:name/all | Delete all records for a source |
---
## `api/lib/sql.js` — SQL Literal Helpers
```js
lit(val) // JS value → SQL literal: 'text', TRUE, 42, NULL, '{"json":"val"}'
arr(val) // JS array → PostgreSQL array literal: ARRAY['a','b']
```
Single quotes within string values are escaped by doubling them (`'`→`''`). Objects and arrays are JSON-serialized. These helpers exist so that query strings in route files are fully formed and can be copied directly into psql.
---
## UI
Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The server serves `public/` as static files and falls back to `public/index.html` for all non-API routes (SPA routing).
**Authentication flow:**
1. On load, check `sessionStorage` for saved credentials. If found, re-authenticate silently via `GET /api/sources`.
2. On login, credentials are stored in memory (`api.js` module-level) and in `sessionStorage` (survives page refresh, cleared on tab close).
3. On 401 response, credentials are cleared and the login screen is shown.
4. `localStorage` persists the selected source name across sessions.
**Pages:**
- **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields.
- **Import** — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import.
- **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle.
- **Mappings** — Tabular mapping editor. Shows all extracted values from transformed records with record counts and sample raw data. Rows are yellow (unmapped), white (mapped), or blue (edited but unsaved). Supports TSV export and import. Columns can be added dynamically.
- **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort.
---
## manage.py
Interactive CLI for setup and operations. Run with `python3 manage.py`. Requires no arguments.
Shows current status on every screen:
- Database connection (host, port, db, user) and whether it succeeds
- Whether the `dataflow` schema is deployed
- Whether SQL functions are deployed (detected by presence of `apply_transformations`)
- Login credentials (configured / not configured)
- UI build status and timestamp
- Systemd service status
- Nginx reverse proxy status
**Menu options:**
1. **Database configuration and deployment dialog** — Write or update `.env` with database connection settings. If connection fails, offers to create the database and user using admin credentials. Optionally deploys schema and SQL functions after `.env` is written.
2. **Redeploy schema** — Runs `database/schema.sql` against the configured database. Warns that this drops all data. Requires explicit confirmation.
3. **Redeploy SQL functions** — Runs all four files in `database/queries/` in order: `sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`. Safe to run at any time without data loss.
4. **Build UI** — Runs `npm run build` in `ui/`, outputting to `public/`.
5. **Set up nginx reverse proxy** — Detects if a proxy already exists for the configured port. Writes an nginx site config to `/etc/nginx/sites-enabled/`. If an SSL certificate exists in `/etc/letsencrypt/live/`, configures HTTPS with HTTP redirect; otherwise configures HTTP only with an offer to run certbot. Requires sudo.
6. **Install systemd service** — Copies `dataflow.service` to `/etc/systemd/system/`, runs `daemon-reload`, enables on boot. Requires sudo.
7. **Start / restart service** — Runs `systemctl start` or `systemctl restart` depending on current state. Requires sudo.
8. **Stop service** — Runs `systemctl stop`. Requires sudo.
9. **Set login credentials** — Prompts for username and password, bcrypt-hashes the password via `node -e "require('bcrypt')..."`, and writes `LOGIN_USER` and `LOGIN_PASSWORD_HASH` to `.env`. Requires Node.js and bcrypt npm package to be installed.
**Key behaviors:**
- All commands that will be run are printed before the user is asked to confirm.
- Actions that require sudo prompt transparently — `sudo` is not run with `-n`, so it uses cached credentials or prompts as normal.
- Nginx config files are written via `sudo cp` from a temp file, then `sudo chmod 644` to make them world-readable for status detection.
- Certificate existence is checked with `sudo test -f` since `/etc/letsencrypt/live/` requires root.
- Option 1's result (updated config) is passed back to the menu loop so status reflects changes without requiring a restart.
---
## Environment Variables (`.env`)
```
DB_HOST PostgreSQL host
DB_PORT PostgreSQL port (default 5432)
DB_NAME Database name
DB_USER Database user
DB_PASSWORD Database password
API_PORT Port the Express server listens on (default 3020)
NODE_ENV development | production
LOGIN_USER Username for Basic Auth
LOGIN_PASSWORD_HASH bcrypt hash of the password
```
---
## Running the Application
```bash
# Install API dependencies
npm install
# Install UI dependencies and build
cd ui && npm install && npm run build && cd ..
# Start (development, auto-reload)
npm run dev
# Start (production)
npm start
```
The server binds to `0.0.0.0` on `API_PORT` and serves both the API and the compiled UI from `public/`.
---
## Deploying SQL Changes
Any time SQL functions are modified:
```bash
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/sources.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/rules.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/mappings.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/records.sql
```
Then restart the server. Function deployment is safe to repeat — all functions use `CREATE OR REPLACE`.
Schema changes (`schema.sql`) drop and recreate the schema, deleting all data. In production, write migration scripts instead.