# Dataflow — Application Specification ## Purpose Dataflow is an internal tool for importing, cleaning, and standardizing data from multiple sources (e.g., bank transaction exports). It is intentionally simple — no queues, no schedulers, no plugins. Everything is triggered explicitly by the user through the UI or API. --- ## Philosophy and Design Choices ### Simple over clever Every decision favors clarity. No abstractions for their own sake, no frameworks beyond what is necessary. If a piece of code is hard to follow, that is a sign it should be rewritten, not abstracted. ### All logic lives in the database SQL functions are the single source of truth for business logic. The API layer is a thin HTTP wrapper — it validates input, calls a function, and returns the result. It does not construct business logic in JavaScript. ### SQL is written as full literal strings Database calls in the route files use fully formed SQL strings with values interpolated directly (not parameterized). This makes every query copy-pasteable into psql for debugging. A small `lit()` helper in `api/lib/sql.js` handles quoting and escaping. This is an intentional trade-off: the tool is internal, and debuggability is worth more than the marginal injection protection parameterization provides over what `lit()` already does. ### One SQL file per route SQL is organized in `database/queries/` with one file per route (`sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`). This makes it easy to find the SQL behind any API endpoint — look at the route file to find the function name, then look at the matching query file for the implementation. ### Explicit over implicit Nothing happens automatically. Transformations are triggered by the user. Views are generated on demand. There are no database triggers, no background workers, no scheduled jobs. ### JSONB for flexibility Raw imported records and transformed records are stored as JSONB. This avoids schema migrations when source formats change and allows different sources to have different field sets in the same table. --- ## Architecture ``` manage.py — interactive CLI for setup, deployment, and management database/ schema.sql — table definitions (run once or to reset) queries/ sources.sql — all SQL for /api/sources rules.sql — all SQL for /api/rules mappings.sql — all SQL for /api/mappings records.sql — all SQL for /api/records api/ server.js — Express server, mounts routes, auth middleware middleware/ auth.js — Basic Auth enforcement on all /api routes lib/ sql.js — lit() and arr() helpers for SQL literal building routes/ sources.js — HTTP handlers for source management rules.js — HTTP handlers for rule management mappings.js — HTTP handlers for mapping management records.js — HTTP handlers for record queries ui/ src/ api.js — fetch wrapper, credential management App.jsx — root: login gate, sidebar, source selector, routing pages/ Login.jsx — username/password form Sources.jsx — source CRUD, field config, view generation Import.jsx — CSV upload and import log Rules.jsx — rule CRUD with live pattern preview Mappings.jsx — mapping table with TSV import/export Records.jsx — paginated, sortable view of transformed records Pivot.jsx — interactive pivot table with cell inspector Log.jsx — global import log across all sources public/ — compiled UI (output of npm run build in ui/) ``` --- ## Database Schema Five tables in the `dataflow` schema: ### `sources` Defines a data source. The `constraint_fields` array specifies which fields make a record unique. `config` (JSONB) holds the output schema (`fields` array) used to generate the typed view. ### `records` Stores every imported record. `data` holds the raw import. `transformed` holds the enriched record after rules and mappings are applied. `constraint_key` is a JSONB object of the constraint field values used to detect duplicates at import time. `import_id` references the `import_log` row; deleting a log entry cascades to its records. ### `rules` Regex transformation rules. Each rule reads from `field`, applies `pattern` with optional `flags`, and writes to `output_field`. `function_type` is either `extract` (regexp_matches) or `replace` (regexp_replace). `sequence` controls the order rules are applied. `retain` keeps the raw extracted value in `output_field` even when a mapping overrides it. ### `mappings` Maps an extracted value to a standardized output object. `input_value` is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). `output` is a JSONB object that can contain multiple fields (e.g., `{"vendor": "Walmart", "category": "Groceries"}`). ### `import_log` Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates. `info` (JSONB) stores the full `inserted_keys` and `excluded_keys` arrays. Deleting a log row cascades to its records via the `import_id` FK. --- ## Data Flow ### Import ``` CSV file → parse in Node.js → import_records(source, data) → build JSONB constraint_key per record → compare against existing records (CTE — no unique constraint) → INSERT new records, skip duplicates → log to import_log (with inserted_keys / excluded_keys) → apply_transformations() runs automatically on new records ``` ### Transform ``` apply_transformations(source) — pure SQL CTE qualifying: records not yet transformed (or all, if overwrite=true) rx: fan out one row per regex match (LATERAL join, rules × records) agg_matches: collapse matches back to one value per (record, rule) linked: LEFT JOIN mappings to find mapped output rule_output: build per-rule output JSONB record_additions: merge all rule outputs per record in sequence order UPDATE records SET transformed = data || additions ``` The transform is fully set-based — no row-by-row loops. All records for a source are processed in a single query. ### View generation `generate_source_view(source)` reads `config.fields` from the source and builds a `CREATE VIEW` statement in the `dfv` schema. Each field is cast to its declared type (`text`, `numeric`, `date`). Fields with an `expression` are computed from other transformed fields using `{field}` substitution syntax. --- ## SQL Functions Each file in `database/queries/` maps 1-to-1 with a route file. **sources.sql** `list_sources`, `get_source`, `create_source`, `update_source`, `delete_source`, `get_import_log`, `get_source_stats`, `get_source_fields`, `get_view_data` (plpgsql — dynamic sort via EXECUTE + quote_ident), `import_records`, `jsonb_merge` + `jsonb_concat_obj` aggregate, `apply_transformations`, `reprocess_records`, `generate_source_view` **rules.sql** `list_rules`, `get_rule`, `create_rule`, `update_rule`, `delete_rule`, `preview_rule` (plpgsql — conditional query for extract vs replace), `test_rule` (returns `TABLE(rule JSONB, results JSONB)`) **mappings.sql** `list_mappings`, `get_mapping`, `create_mapping`, `upsert_mapping`, `update_mapping`, `delete_mapping`, `get_mapping_counts`, `get_all_values` (plpgsql — extracted values with mapping join), `get_unmapped_values` (plpgsql — extracted values with no mapping) **records.sql** `list_records`, `get_record`, `search_records` (JSONB containment on data and transformed), `delete_record`, `delete_source_records` --- ## API All routes are under `/api`. Every route requires HTTP Basic Auth. The `GET /health` endpoint is exempt. **Authentication:** `Authorization: Basic ` on every request. Credentials are verified against `LOGIN_USER` (plaintext username) and `LOGIN_PASSWORD_HASH` (bcrypt hash) in `.env`. There are no sessions or tokens — credentials are sent with every request. **Route summary:** | Method | Path | Description | |--------|------|-------------| | GET | /api/sources | List all sources | | POST | /api/sources | Create source | | GET | /api/sources/:name | Get source | | PUT | /api/sources/:name | Update source (constraint_fields, config) | | DELETE | /api/sources/:name | Delete source and all data | | POST | /api/sources/suggest | Suggest source config from CSV upload | | POST | /api/sources/:name/import | Import CSV records | | GET | /api/sources/:name/import-log | Import history | | GET | /api/sources/:name/stats | Record counts | | GET | /api/sources/:name/fields | All known field names and origins | | GET | /api/sources/:name/view-data | Paginated, sortable view data | | POST | /api/sources/:name/transform | Apply transformations (new records only) | | POST | /api/sources/:name/reprocess | Reapply transformations to all records | | POST | /api/sources/:name/view | Generate dfv view | | GET | /api/rules/source/:name | List rules for a source | | GET | /api/rules/preview | Preview pattern against live records | | GET | /api/rules/:id/test | Test saved rule against live records | | POST | /api/rules | Create rule | | PUT | /api/rules/:id | Update rule | | DELETE | /api/rules/:id | Delete rule | | GET | /api/mappings/source/:name | List mappings | | GET | /api/mappings/source/:name/all-values | All extracted values (mapped + unmapped) | | GET | /api/mappings/source/:name/unmapped | Only unmapped extracted values | | GET | /api/mappings/source/:name/counts | Record counts per mapping | | GET | /api/mappings/source/:name/export.tsv | Export mappings as TSV | | POST | /api/mappings/source/:name/import-csv | Import/update mappings from TSV | | POST | /api/mappings | Create mapping | | POST | /api/mappings/bulk | Upsert multiple mappings | | PUT | /api/mappings/:id | Update mapping | | DELETE | /api/mappings/:id | Delete mapping | | GET | /api/records/source/:name | List raw records | | GET | /api/records/:id | Get single record | | POST | /api/records/search | Search by JSONB containment | | DELETE | /api/records/:id | Delete record | | DELETE | /api/records/source/:name/all | Delete all records for a source | --- ## `api/lib/sql.js` — SQL Literal Helpers ```js lit(val) // JS value → SQL literal: 'text', TRUE, 42, NULL, '{"json":"val"}' arr(val) // JS array → PostgreSQL array literal: ARRAY['a','b'] ``` Single quotes within string values are escaped by doubling them (`'`→`''`). Objects and arrays are JSON-serialized. These helpers exist so that query strings in route files are fully formed and can be copied directly into psql. --- ## UI Built with React + Vite + Tailwind CSS. Compiled output goes to `public/`. The server serves `public/` as static files and falls back to `public/index.html` for all non-API routes (SPA routing). **Authentication flow:** 1. On load, check `sessionStorage` for saved credentials. If found, re-authenticate silently via `GET /api/sources`. 2. On login, credentials are stored in memory (`api.js` module-level) and in `sessionStorage` (survives page refresh, cleared on tab close). 3. On 401 response, credentials are cleared and the login screen is shown. 4. `localStorage` persists the selected source name across sessions. **Pages:** - **Sources** — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are constraint fields and which appear in the output view. Supports CSV upload to auto-detect fields. - **Import** — Upload a CSV to import records into the selected source. Transformations run automatically on new records. Shows import log with inserted/duplicate counts, expandable key detail, checkbox selection, and delete with confirmation. - **Rules** — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle. - **Mappings** — Tabular mapping editor. Shows all extracted values from transformed records with record counts and sample raw data. Rows are yellow (unmapped), white (mapped), or blue (edited but unsaved). Supports TSV export and import. Columns can be added dynamically. - **Records** — Paginated table showing the `dfv.{source}` view. Server-side sorting (column validated against `information_schema.columns`, interpolated with `quote_ident`). Dates are formatted `YYYY-MM-DD` for correct lexicographic sort. Regex filters can be added per column. If the view cast fails (e.g. a field typed as `date` contains text), the error is shown inline rather than a blank page. - **Pivot** — Interactive pivot/crosstab powered by [Perspective](https://perspective.finos.org/) (`@perspective-dev` v4.4.0, loaded from CDN at runtime). Loads all rows from the source view into an in-browser Perspective worker and renders a `` web component. Supports grouping, splitting, filtering, sorting, and charting interactively. **Toolbar (above the viewer):** - Named layouts — saved per source in the `pivot_layouts` DB table. Each chip recalls the full viewer state including group_by, split_by, filters, expressions, selection mode, and expand depth. A blue **Save** button overwrites the active layout in place; **+ Save as…** saves to a new name. The × on each chip deletes it. - **depth: 0 1 2 3** — collapses or expands all grouped rows to the specified hierarchy level. Implemented via `view.set_depth(d)` + `plugin.draw(view)` (the only working mechanism found in v4.4.0 — `plugin_config.expand_depth` and `viewer.flush()` alone have no effect). - The Perspective built-in **selection mode button** (Read-Only / Select Row / Select Column / Select Region) defaults to **Select Region** on fresh load, set directly via `plugin.restore({ edit_mode: 'SELECT_REGION' })` after the viewer loads. **Cell inspector (right panel):** - Opens when a cell is clicked and a `group_by` hierarchy is active. If there is no `group_by`, the click is ignored — without coordinate filters the query would return the full dataset. - Row filtering uses a temporary Perspective view (`table.view({ filter: eventFilters, expressions: config.expressions })`) so that computed/expression columns in `split_by` are evaluated correctly. Falls back to JS-side filtering if the view query fails. - Shows cell coordinates (group_by › split_by values), the clicked metric with value, any user-set filters, and a table of matching raw rows. - Number formatting rounds to 2 decimal places by default; a −/+ control in the inspector header adjusts precision (0–8). **Layout persistence:** - `localStorage` key `psp_layout_{source}` saves the last viewer state on each named layout save. - Named layouts store `{ ...viewer.save(), plugin_config: plugin.save(), expand_depth }` as JSONB in `pivot_layouts`. On recall, viewer config, plugin config (edit mode), and expand depth are all restored independently. See `docs/perspective-pivot.md` for the full technical reference on controlling Perspective programmatically. - **Log** — Global import log across all sources. Same expandable key detail and delete capability as the Import page, plus a source name column. --- ## manage.py Interactive CLI for setup and operations. Run with `python3 manage.py`. Requires no arguments. Shows current status on every screen: - Database connection (host, port, db, user) and whether it succeeds - Whether the `dataflow` schema is deployed - Whether SQL functions are deployed (detected by presence of `apply_transformations`) - Login credentials (configured / not configured) - UI build status and timestamp - Systemd service status - Nginx reverse proxy status **Menu options:** 1. **Database configuration and deployment dialog** — Write or update `.env` with database connection settings. If connection fails, offers to create the database and user using admin credentials. Optionally deploys schema and SQL functions after `.env` is written. 2. **Redeploy schema** — Runs `database/schema.sql` against the configured database. Warns that this drops all data. Requires explicit confirmation. 3. **Redeploy SQL functions** — Runs all four files in `database/queries/` in order: `sources.sql`, `rules.sql`, `mappings.sql`, `records.sql`. Safe to run at any time without data loss. 4. **Build UI** — Runs `npm run build` in `ui/`, outputting to `public/`. 5. **Set up nginx reverse proxy** — Detects if a proxy already exists for the configured port. Writes an nginx site config to `/etc/nginx/sites-enabled/`. If an SSL certificate exists in `/etc/letsencrypt/live/`, configures HTTPS with HTTP redirect; otherwise configures HTTP only with an offer to run certbot. Requires sudo. 6. **Install systemd service** — Copies `dataflow.service` to `/etc/systemd/system/`, runs `daemon-reload`, enables on boot. Requires sudo. 7. **Start / restart service** — Runs `systemctl start` or `systemctl restart` depending on current state. Requires sudo. 8. **Stop service** — Runs `systemctl stop`. Requires sudo. 9. **Set login credentials** — Prompts for username and password, bcrypt-hashes the password via `node -e "require('bcrypt')..."`, and writes `LOGIN_USER` and `LOGIN_PASSWORD_HASH` to `.env`. Requires Node.js and bcrypt npm package to be installed. **Key behaviors:** - All commands that will be run are printed before the user is asked to confirm. - Actions that require sudo prompt transparently — `sudo` is not run with `-n`, so it uses cached credentials or prompts as normal. - Nginx config files are written via `sudo cp` from a temp file, then `sudo chmod 644` to make them world-readable for status detection. - Certificate existence is checked with `sudo test -f` since `/etc/letsencrypt/live/` requires root. - Option 1's result (updated config) is passed back to the menu loop so status reflects changes without requiring a restart. --- ## Environment Variables (`.env`) ``` DB_HOST PostgreSQL host DB_PORT PostgreSQL port (default 5432) DB_NAME Database name DB_USER Database user DB_PASSWORD Database password API_PORT Port the Express server listens on (default 3020) NODE_ENV development | production LOGIN_USER Username for Basic Auth LOGIN_PASSWORD_HASH bcrypt hash of the password ``` --- ## Running the Application ```bash # Install API dependencies npm install # Install UI dependencies and build cd ui && npm install && npm run build && cd .. # Start (development, auto-reload) npm run dev # Start (production) npm start ``` The server binds to `0.0.0.0` on `API_PORT` and serves both the API and the compiled UI from `public/`. --- ## Deploying SQL Changes Any time SQL functions are modified: ```bash PGPASSWORD= psql -h -U -d -f database/queries/sources.sql PGPASSWORD= psql -h -U -d -f database/queries/rules.sql PGPASSWORD= psql -h -U -d -f database/queries/mappings.sql PGPASSWORD= psql -h -U -d -f database/queries/records.sql ``` Then restart the server. Function deployment is safe to repeat — all functions use `CREATE OR REPLACE`. Schema changes (`schema.sql`) drop and recreate the schema, deleting all data. In production, write migration scripts instead.