Initial commit: dataflow data transformation tool
This commit is contained in:
commit
3e2d56991c
10
.env.example
Normal file
10
.env.example
Normal file
@ -0,0 +1,10 @@
|
||||
# Database Configuration
|
||||
DB_HOST=localhost
|
||||
DB_PORT=5432
|
||||
DB_NAME=dataflow
|
||||
DB_USER=postgres
|
||||
DB_PASSWORD=your_password_here
|
||||
|
||||
# API Configuration
|
||||
API_PORT=3000
|
||||
NODE_ENV=development
|
||||
24
.gitignore
vendored
Normal file
24
.gitignore
vendored
Normal file
@ -0,0 +1,24 @@
|
||||
# Environment
|
||||
.env
|
||||
|
||||
# Dependencies
|
||||
node_modules/
|
||||
package-lock.json
|
||||
|
||||
# Logs
|
||||
*.log
|
||||
logs/
|
||||
|
||||
# OS files
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
|
||||
# Uploads
|
||||
uploads/*
|
||||
!uploads/.gitkeep
|
||||
206
CLAUDE.md
Normal file
206
CLAUDE.md
Normal file
@ -0,0 +1,206 @@
|
||||
# CLAUDE.md
|
||||
|
||||
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
||||
|
||||
## Overview
|
||||
|
||||
Dataflow is a simple data transformation tool for importing, cleaning, and standardizing data from various sources. Built with PostgreSQL and Node.js/Express, it emphasizes clarity and simplicity over complexity.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
1. **Sources** - Define data sources and deduplication rules (which fields make a record unique)
|
||||
2. **Import** - Load CSV data, automatically deduplicating based on source rules
|
||||
3. **Rules** - Extract information using regex patterns (e.g., extract merchant from transaction description)
|
||||
4. **Mappings** - Map extracted values to standardized output (e.g., "WALMART" → {"vendor": "Walmart", "category": "Groceries"})
|
||||
5. **Transform** - Apply rules and mappings to create clean, enriched data
|
||||
|
||||
## Architecture
|
||||
|
||||
### Database Schema (`database/schema.sql`)
|
||||
|
||||
**5 simple tables:**
|
||||
- `sources` - Source definitions with `dedup_fields` array
|
||||
- `records` - Imported data with `data` (raw) and `transformed` (enriched) JSONB columns
|
||||
- `rules` - Regex extraction rules with `field`, `pattern`, `output_field`
|
||||
- `mappings` - Input/output value mappings
|
||||
- `import_log` - Audit trail
|
||||
|
||||
**Key design:**
|
||||
- JSONB for flexible data storage
|
||||
- Deduplication via MD5 hash of specified fields
|
||||
- Simple, flat structure (no complex relationships)
|
||||
|
||||
### Database Functions (`database/functions.sql`)
|
||||
|
||||
**4 focused functions:**
|
||||
- `import_records(source_name, data)` - Import with deduplication
|
||||
- `apply_transformations(source_name, record_ids)` - Apply rules and mappings
|
||||
- `get_unmapped_values(source_name, rule_name)` - Find values needing mappings
|
||||
- `reprocess_records(source_name)` - Re-transform all records
|
||||
|
||||
**Design principle:** Each function does ONE thing. No nested CTEs, no duplication.
|
||||
|
||||
### API Server (`api/server.js` + `api/routes/`)
|
||||
|
||||
**RESTful endpoints:**
|
||||
- `/api/sources` - CRUD sources, import CSV, trigger transformations
|
||||
- `/api/rules` - CRUD transformation rules
|
||||
- `/api/mappings` - CRUD value mappings, view unmapped values
|
||||
- `/api/records` - Query and search transformed data
|
||||
|
||||
**Route files:**
|
||||
- `routes/sources.js` - Source management and CSV import
|
||||
- `routes/rules.js` - Rule management
|
||||
- `routes/mappings.js` - Mapping management + unmapped values
|
||||
- `routes/records.js` - Record queries and search
|
||||
|
||||
## Common Development Tasks
|
||||
|
||||
### Running the Application
|
||||
|
||||
```bash
|
||||
# Setup (first time only)
|
||||
./setup.sh
|
||||
|
||||
# Start development server with auto-reload
|
||||
npm run dev
|
||||
|
||||
# Start production server
|
||||
npm start
|
||||
|
||||
# Test API
|
||||
curl http://localhost:3000/health
|
||||
```
|
||||
|
||||
### Database Changes
|
||||
|
||||
When modifying schema:
|
||||
1. Edit `database/schema.sql`
|
||||
2. Drop and recreate schema: `psql -d dataflow -f database/schema.sql`
|
||||
3. Redeploy functions: `psql -d dataflow -f database/functions.sql`
|
||||
|
||||
For production, write migration scripts instead of dropping schema.
|
||||
|
||||
### Adding a New API Endpoint
|
||||
|
||||
1. Add route to appropriate file in `api/routes/`
|
||||
2. Follow existing patterns (async/await, error handling via `next()`)
|
||||
3. Use parameterized queries to prevent SQL injection
|
||||
4. Return consistent JSON format
|
||||
|
||||
### Testing
|
||||
|
||||
Manual testing workflow:
|
||||
1. Create a source: `POST /api/sources`
|
||||
2. Create rules: `POST /api/rules`
|
||||
3. Import data: `POST /api/sources/:name/import`
|
||||
4. Apply transformations: `POST /api/sources/:name/transform`
|
||||
5. View results: `GET /api/records/source/:name`
|
||||
|
||||
See `examples/GETTING_STARTED.md` for complete curl examples.
|
||||
|
||||
## Design Principles
|
||||
|
||||
1. **Simple over clever** - Straightforward code beats optimization
|
||||
2. **Explicit over implicit** - No magic, no hidden triggers
|
||||
3. **Clear naming** - `data` not `rec`, `transformed` not `allj`
|
||||
4. **One function, one job** - No 250-line functions
|
||||
5. **JSONB for flexibility** - Handle varying schemas without migrations
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Import Flow
|
||||
```
|
||||
CSV file → parse → import_records() → records table (data column)
|
||||
```
|
||||
|
||||
### Transformation Flow
|
||||
```
|
||||
records.data → apply_transformations() →
|
||||
- Apply each rule (regex extraction)
|
||||
- Look up mappings
|
||||
- Merge into records.transformed
|
||||
```
|
||||
|
||||
### Deduplication
|
||||
- Hash is MD5 of concatenated values from `dedup_fields`
|
||||
- Unique constraint on `(source_name, dedup_key)` prevents duplicates
|
||||
- Import function catches unique violations and counts them
|
||||
|
||||
### Error Handling
|
||||
- API routes use `try/catch` and pass errors to `next(err)`
|
||||
- Server.js has global error handler
|
||||
- Database functions return JSON with `success` boolean
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
dataflow/
|
||||
├── database/
|
||||
│ ├── schema.sql # Table definitions
|
||||
│ └── functions.sql # Import/transform functions
|
||||
├── api/
|
||||
│ ├── server.js # Express server
|
||||
│ └── routes/ # API endpoints
|
||||
│ ├── sources.js
|
||||
│ ├── rules.js
|
||||
│ ├── mappings.js
|
||||
│ └── records.js
|
||||
├── examples/
|
||||
│ ├── GETTING_STARTED.md # Tutorial
|
||||
│ └── bank_transactions.csv
|
||||
├── .env.example # Config template
|
||||
├── package.json
|
||||
└── README.md
|
||||
```
|
||||
|
||||
## Comparison to Legacy TPS System
|
||||
|
||||
This project replaces an older system (in `/opt/tps`) that had:
|
||||
- 2,150 lines of complex SQL with heavy duplication
|
||||
- 5 nearly-identical 200+ line functions
|
||||
- Confusing names and deep nested CTEs
|
||||
- Complex trigger-based processing
|
||||
|
||||
Dataflow achieves the same functionality with:
|
||||
- ~400 lines of simple SQL
|
||||
- 4 focused functions
|
||||
- Clear names and linear logic
|
||||
- Explicit API-triggered processing
|
||||
|
||||
The simplification makes it easy to understand, modify, and maintain.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**Database connection fails:**
|
||||
- Check `.env` file exists and has correct credentials
|
||||
- Verify PostgreSQL is running: `psql -U postgres -l`
|
||||
- Check search path is set: Should default to `dataflow` schema
|
||||
|
||||
**Import succeeds but transformation fails:**
|
||||
- Check rules exist: `SELECT * FROM dataflow.rules WHERE source_name = 'xxx'`
|
||||
- Verify field names match CSV columns
|
||||
- Test regex pattern manually
|
||||
- Check for SQL errors in logs
|
||||
|
||||
**All records marked as duplicates:**
|
||||
- Verify `dedup_fields` match actual field names in data
|
||||
- Check if data was already imported
|
||||
- Use different source name for testing
|
||||
|
||||
## Adding New Features
|
||||
|
||||
When adding features, follow these principles:
|
||||
- Add ONE function that does ONE thing
|
||||
- Keep functions under 100 lines if possible
|
||||
- Write clear SQL, not clever SQL
|
||||
- Add API endpoint that calls the function
|
||||
- Document in README.md and update examples
|
||||
|
||||
## Notes for Claude
|
||||
|
||||
- This is a **simple** system by design - don't over-engineer it
|
||||
- Keep functions focused and linear
|
||||
- Use JSONB for flexibility, not as a crutch for bad design
|
||||
- When confused, read the examples/GETTING_STARTED.md walkthrough
|
||||
- The old TPS system is in `/opt/tps` - this is a clean rewrite, not a refactor
|
||||
120
README.md
Normal file
120
README.md
Normal file
@ -0,0 +1,120 @@
|
||||
# Dataflow
|
||||
|
||||
A simple, understandable data transformation tool for ingesting, mapping, and transforming data from various sources.
|
||||
|
||||
## What It Does
|
||||
|
||||
Dataflow helps you:
|
||||
1. **Import** data from CSV files (or other formats)
|
||||
2. **Transform** data using regex rules to extract meaningful information
|
||||
3. **Map** extracted values to standardized output
|
||||
4. **Query** the transformed data
|
||||
|
||||
Perfect for cleaning up messy data like bank transactions, product lists, or any repetitive data that needs normalization.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Sources
|
||||
Define where data comes from and how to deduplicate it.
|
||||
|
||||
**Example:** Bank transactions deduplicated by date + amount + description
|
||||
|
||||
### 2. Rules
|
||||
Extract information using regex patterns.
|
||||
|
||||
**Example:** Extract merchant name from transaction description
|
||||
|
||||
### 3. Mappings
|
||||
Map extracted values to clean, standardized output.
|
||||
|
||||
**Example:** "DISCOUNT DRUG MART 32" → {"vendor": "Discount Drug Mart", "category": "Healthcare"}
|
||||
|
||||
## Architecture
|
||||
|
||||
- **Database:** PostgreSQL with JSONB for flexibility
|
||||
- **API:** Node.js/Express for REST endpoints
|
||||
- **Storage:** Raw data preserved, transformations are computed and stored
|
||||
|
||||
## Design Principles
|
||||
|
||||
- **Simple & Clear** - Easy to understand what's happening
|
||||
- **Explicit** - No hidden magic or complex triggers
|
||||
- **Testable** - Every function can be tested independently
|
||||
- **Flexible** - Handle varying data formats without schema changes
|
||||
|
||||
## Getting Started
|
||||
|
||||
### Prerequisites
|
||||
- PostgreSQL 12+
|
||||
- Node.js 16+
|
||||
|
||||
### Installation
|
||||
|
||||
1. Install dependencies:
|
||||
```bash
|
||||
npm install
|
||||
```
|
||||
|
||||
2. Configure database (copy .env.example to .env and edit):
|
||||
```bash
|
||||
cp .env.example .env
|
||||
```
|
||||
|
||||
3. Deploy database schema:
|
||||
```bash
|
||||
psql -U postgres -d dataflow -f database/schema.sql
|
||||
psql -U postgres -d dataflow -f database/functions.sql
|
||||
```
|
||||
|
||||
4. Start the API server:
|
||||
```bash
|
||||
npm start
|
||||
```
|
||||
|
||||
## Quick Example
|
||||
|
||||
```javascript
|
||||
// 1. Define a source
|
||||
POST /api/sources
|
||||
{
|
||||
"name": "bank_transactions",
|
||||
"dedup_fields": ["date", "amount", "description"]
|
||||
}
|
||||
|
||||
// 2. Create a transformation rule
|
||||
POST /api/sources/bank_transactions/rules
|
||||
{
|
||||
"name": "extract_merchant",
|
||||
"pattern": "^([A-Z][A-Z ]+)",
|
||||
"field": "description"
|
||||
}
|
||||
|
||||
// 3. Import data
|
||||
POST /api/sources/bank_transactions/import
|
||||
[CSV file upload]
|
||||
|
||||
// 4. Query transformed data
|
||||
GET /api/sources/bank_transactions/records
|
||||
```
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
dataflow/
|
||||
├── database/ # PostgreSQL schema and functions
|
||||
│ ├── schema.sql # Table definitions
|
||||
│ └── functions.sql # Import and transformation functions
|
||||
├── api/ # Express REST API
|
||||
│ ├── server.js # Main server
|
||||
│ └── routes/ # API route handlers
|
||||
├── examples/ # Sample data and use cases
|
||||
└── docs/ # Additional documentation
|
||||
```
|
||||
|
||||
## Status
|
||||
|
||||
**Current Phase:** Initial development - building core functionality
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
178
api/routes/mappings.js
Normal file
178
api/routes/mappings.js
Normal file
@ -0,0 +1,178 @@
|
||||
/**
|
||||
* Mappings Routes
|
||||
* Manage value mappings
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
|
||||
module.exports = (pool) => {
|
||||
const router = express.Router();
|
||||
|
||||
// List all mappings for a source
|
||||
router.get('/source/:source_name', async (req, res, next) => {
|
||||
try {
|
||||
const { rule_name } = req.query;
|
||||
|
||||
let query = 'SELECT * FROM mappings WHERE source_name = $1';
|
||||
const params = [req.params.source_name];
|
||||
|
||||
if (rule_name) {
|
||||
query += ' AND rule_name = $2';
|
||||
params.push(rule_name);
|
||||
}
|
||||
|
||||
query += ' ORDER BY rule_name, input_value';
|
||||
|
||||
const result = await pool.query(query, params);
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get unmapped values
|
||||
router.get('/source/:source_name/unmapped', async (req, res, next) => {
|
||||
try {
|
||||
const { rule_name } = req.query;
|
||||
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM get_unmapped_values($1, $2)',
|
||||
[req.params.source_name, rule_name || null]
|
||||
);
|
||||
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get single mapping
|
||||
router.get('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM mappings WHERE id = $1',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Mapping not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Create mapping
|
||||
router.post('/', async (req, res, next) => {
|
||||
try {
|
||||
const { source_name, rule_name, input_value, output } = req.body;
|
||||
|
||||
if (!source_name || !rule_name || !input_value || !output) {
|
||||
return res.status(400).json({
|
||||
error: 'Missing required fields: source_name, rule_name, input_value, output'
|
||||
});
|
||||
}
|
||||
|
||||
const result = await pool.query(
|
||||
`INSERT INTO mappings (source_name, rule_name, input_value, output)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING *`,
|
||||
[source_name, rule_name, input_value, JSON.stringify(output)]
|
||||
);
|
||||
|
||||
res.status(201).json(result.rows[0]);
|
||||
} catch (err) {
|
||||
if (err.code === '23505') { // Unique violation
|
||||
return res.status(409).json({ error: 'Mapping already exists' });
|
||||
}
|
||||
if (err.code === '23503') { // Foreign key violation
|
||||
return res.status(404).json({ error: 'Source or rule not found' });
|
||||
}
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Bulk create mappings
|
||||
router.post('/bulk', async (req, res, next) => {
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
const { mappings } = req.body;
|
||||
|
||||
if (!Array.isArray(mappings)) {
|
||||
return res.status(400).json({ error: 'Expected array of mappings' });
|
||||
}
|
||||
|
||||
await client.query('BEGIN');
|
||||
|
||||
const results = [];
|
||||
for (const mapping of mappings) {
|
||||
const { source_name, rule_name, input_value, output } = mapping;
|
||||
|
||||
const result = await client.query(
|
||||
`INSERT INTO mappings (source_name, rule_name, input_value, output)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (source_name, rule_name, input_value)
|
||||
DO UPDATE SET output = EXCLUDED.output
|
||||
RETURNING *`,
|
||||
[source_name, rule_name, input_value, JSON.stringify(output)]
|
||||
);
|
||||
|
||||
results.push(result.rows[0]);
|
||||
}
|
||||
|
||||
await client.query('COMMIT');
|
||||
res.status(201).json({ count: results.length, mappings: results });
|
||||
} catch (err) {
|
||||
await client.query('ROLLBACK');
|
||||
next(err);
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
});
|
||||
|
||||
// Update mapping
|
||||
router.put('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const { input_value, output } = req.body;
|
||||
|
||||
const result = await pool.query(
|
||||
`UPDATE mappings
|
||||
SET input_value = COALESCE($2, input_value),
|
||||
output = COALESCE($3, output)
|
||||
WHERE id = $1
|
||||
RETURNING *`,
|
||||
[req.params.id, input_value, output ? JSON.stringify(output) : null]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Mapping not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Delete mapping
|
||||
router.delete('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'DELETE FROM mappings WHERE id = $1 RETURNING id',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Mapping not found' });
|
||||
}
|
||||
|
||||
res.json({ success: true, deleted: result.rows[0].id });
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
return router;
|
||||
};
|
||||
111
api/routes/records.js
Normal file
111
api/routes/records.js
Normal file
@ -0,0 +1,111 @@
|
||||
/**
|
||||
* Records Routes
|
||||
* Query and manage imported records
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
|
||||
module.exports = (pool) => {
|
||||
const router = express.Router();
|
||||
|
||||
// List records for a source
|
||||
router.get('/source/:source_name', async (req, res, next) => {
|
||||
try {
|
||||
const { limit = 100, offset = 0, transformed_only } = req.query;
|
||||
|
||||
let query = 'SELECT * FROM records WHERE source_name = $1';
|
||||
const params = [req.params.source_name];
|
||||
|
||||
if (transformed_only === 'true') {
|
||||
query += ' AND transformed IS NOT NULL';
|
||||
}
|
||||
|
||||
query += ' ORDER BY id DESC LIMIT $2 OFFSET $3';
|
||||
params.push(parseInt(limit), parseInt(offset));
|
||||
|
||||
const result = await pool.query(query, params);
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get single record
|
||||
router.get('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM records WHERE id = $1',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Record not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Search records
|
||||
router.post('/search', async (req, res, next) => {
|
||||
try {
|
||||
const { source_name, query, limit = 100 } = req.body;
|
||||
|
||||
if (!source_name || !query) {
|
||||
return res.status(400).json({
|
||||
error: 'Missing required fields: source_name, query'
|
||||
});
|
||||
}
|
||||
|
||||
// Search in both data and transformed fields
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM records
|
||||
WHERE source_name = $1
|
||||
AND (data @> $2 OR transformed @> $2)
|
||||
ORDER BY id DESC
|
||||
LIMIT $3`,
|
||||
[source_name, JSON.stringify(query), parseInt(limit)]
|
||||
);
|
||||
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Delete record
|
||||
router.delete('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'DELETE FROM records WHERE id = $1 RETURNING id',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Record not found' });
|
||||
}
|
||||
|
||||
res.json({ success: true, deleted: result.rows[0].id });
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Delete all records for a source
|
||||
router.delete('/source/:source_name/all', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'DELETE FROM records WHERE source_name = $1',
|
||||
[req.params.source_name]
|
||||
);
|
||||
|
||||
res.json({ success: true, deleted_count: result.rowCount });
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
return router;
|
||||
};
|
||||
119
api/routes/rules.js
Normal file
119
api/routes/rules.js
Normal file
@ -0,0 +1,119 @@
|
||||
/**
|
||||
* Rules Routes
|
||||
* Manage transformation rules
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
|
||||
module.exports = (pool) => {
|
||||
const router = express.Router();
|
||||
|
||||
// List all rules for a source
|
||||
router.get('/source/:source_name', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM rules WHERE source_name = $1 ORDER BY sequence, name',
|
||||
[req.params.source_name]
|
||||
);
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get single rule
|
||||
router.get('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM rules WHERE id = $1',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Rule not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Create rule
|
||||
router.post('/', async (req, res, next) => {
|
||||
try {
|
||||
const { source_name, name, field, pattern, output_field, enabled, sequence } = req.body;
|
||||
|
||||
if (!source_name || !name || !field || !pattern || !output_field) {
|
||||
return res.status(400).json({
|
||||
error: 'Missing required fields: source_name, name, field, pattern, output_field'
|
||||
});
|
||||
}
|
||||
|
||||
const result = await pool.query(
|
||||
`INSERT INTO rules (source_name, name, field, pattern, output_field, enabled, sequence)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
RETURNING *`,
|
||||
[source_name, name, field, pattern, output_field, enabled !== false, sequence || 0]
|
||||
);
|
||||
|
||||
res.status(201).json(result.rows[0]);
|
||||
} catch (err) {
|
||||
if (err.code === '23505') { // Unique violation
|
||||
return res.status(409).json({ error: 'Rule already exists for this source' });
|
||||
}
|
||||
if (err.code === '23503') { // Foreign key violation
|
||||
return res.status(404).json({ error: 'Source not found' });
|
||||
}
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Update rule
|
||||
router.put('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const { name, field, pattern, output_field, enabled, sequence } = req.body;
|
||||
|
||||
const result = await pool.query(
|
||||
`UPDATE rules
|
||||
SET name = COALESCE($2, name),
|
||||
field = COALESCE($3, field),
|
||||
pattern = COALESCE($4, pattern),
|
||||
output_field = COALESCE($5, output_field),
|
||||
enabled = COALESCE($6, enabled),
|
||||
sequence = COALESCE($7, sequence)
|
||||
WHERE id = $1
|
||||
RETURNING *`,
|
||||
[req.params.id, name, field, pattern, output_field, enabled, sequence]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Rule not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Delete rule
|
||||
router.delete('/:id', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'DELETE FROM rules WHERE id = $1 RETURNING id, name',
|
||||
[req.params.id]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Rule not found' });
|
||||
}
|
||||
|
||||
res.json({ success: true, deleted: result.rows[0] });
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
return router;
|
||||
};
|
||||
189
api/routes/sources.js
Normal file
189
api/routes/sources.js
Normal file
@ -0,0 +1,189 @@
|
||||
/**
|
||||
* Sources Routes
|
||||
* Manage data sources
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
const multer = require('multer');
|
||||
const { parse } = require('csv-parse/sync');
|
||||
|
||||
const upload = multer({ storage: multer.memoryStorage() });
|
||||
|
||||
module.exports = (pool) => {
|
||||
const router = express.Router();
|
||||
|
||||
// List all sources
|
||||
router.get('/', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM sources ORDER BY name'
|
||||
);
|
||||
res.json(result.rows);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get single source
|
||||
router.get('/:name', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT * FROM sources WHERE name = $1',
|
||||
[req.params.name]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Source not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Create source
|
||||
router.post('/', async (req, res, next) => {
|
||||
try {
|
||||
const { name, dedup_fields, config } = req.body;
|
||||
|
||||
if (!name || !dedup_fields || !Array.isArray(dedup_fields)) {
|
||||
return res.status(400).json({
|
||||
error: 'Missing required fields: name, dedup_fields (array)'
|
||||
});
|
||||
}
|
||||
|
||||
const result = await pool.query(
|
||||
`INSERT INTO sources (name, dedup_fields, config)
|
||||
VALUES ($1, $2, $3)
|
||||
RETURNING *`,
|
||||
[name, dedup_fields, config || {}]
|
||||
);
|
||||
|
||||
res.status(201).json(result.rows[0]);
|
||||
} catch (err) {
|
||||
if (err.code === '23505') { // Unique violation
|
||||
return res.status(409).json({ error: 'Source already exists' });
|
||||
}
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Update source
|
||||
router.put('/:name', async (req, res, next) => {
|
||||
try {
|
||||
const { dedup_fields, config } = req.body;
|
||||
|
||||
const result = await pool.query(
|
||||
`UPDATE sources
|
||||
SET dedup_fields = COALESCE($2, dedup_fields),
|
||||
config = COALESCE($3, config),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE name = $1
|
||||
RETURNING *`,
|
||||
[req.params.name, dedup_fields, config]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Source not found' });
|
||||
}
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Delete source
|
||||
router.delete('/:name', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'DELETE FROM sources WHERE name = $1 RETURNING name',
|
||||
[req.params.name]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Source not found' });
|
||||
}
|
||||
|
||||
res.json({ success: true, deleted: result.rows[0].name });
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Import CSV data
|
||||
router.post('/:name/import', upload.single('file'), async (req, res, next) => {
|
||||
try {
|
||||
if (!req.file) {
|
||||
return res.status(400).json({ error: 'No file uploaded' });
|
||||
}
|
||||
|
||||
// Parse CSV
|
||||
const records = parse(req.file.buffer, {
|
||||
columns: true,
|
||||
skip_empty_lines: true,
|
||||
trim: true
|
||||
});
|
||||
|
||||
// Import records
|
||||
const result = await pool.query(
|
||||
'SELECT import_records($1, $2) as result',
|
||||
[req.params.name, JSON.stringify(records)]
|
||||
);
|
||||
|
||||
res.json(result.rows[0].result);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Apply transformations
|
||||
router.post('/:name/transform', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT apply_transformations($1) as result',
|
||||
[req.params.name]
|
||||
);
|
||||
|
||||
res.json(result.rows[0].result);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Reprocess all records
|
||||
router.post('/:name/reprocess', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
'SELECT reprocess_records($1) as result',
|
||||
[req.params.name]
|
||||
);
|
||||
|
||||
res.json(result.rows[0].result);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
// Get statistics
|
||||
router.get('/:name/stats', async (req, res, next) => {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
`SELECT
|
||||
COUNT(*) as total_records,
|
||||
COUNT(*) FILTER (WHERE transformed IS NOT NULL) as transformed_records,
|
||||
COUNT(*) FILTER (WHERE transformed IS NULL) as pending_records
|
||||
FROM records
|
||||
WHERE source_name = $1`,
|
||||
[req.params.name]
|
||||
);
|
||||
|
||||
res.json(result.rows[0]);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
});
|
||||
|
||||
return router;
|
||||
};
|
||||
97
api/server.js
Normal file
97
api/server.js
Normal file
@ -0,0 +1,97 @@
|
||||
/**
|
||||
* Dataflow API Server
|
||||
* Simple REST API for data transformation
|
||||
*/
|
||||
|
||||
require('dotenv').config();
|
||||
const express = require('express');
|
||||
const { Pool } = require('pg');
|
||||
|
||||
const app = express();
|
||||
const PORT = process.env.API_PORT || 3000;
|
||||
|
||||
// Database connection
|
||||
const pool = new Pool({
|
||||
host: process.env.DB_HOST,
|
||||
port: process.env.DB_PORT,
|
||||
database: process.env.DB_NAME,
|
||||
user: process.env.DB_USER,
|
||||
password: process.env.DB_PASSWORD
|
||||
});
|
||||
|
||||
// Middleware
|
||||
app.use(express.json());
|
||||
app.use(express.urlencoded({ extended: true }));
|
||||
|
||||
// Set search path for all queries
|
||||
pool.on('connect', (client) => {
|
||||
client.query('SET search_path TO dataflow, public');
|
||||
});
|
||||
|
||||
// Test database connection
|
||||
pool.query('SELECT NOW()', (err, res) => {
|
||||
if (err) {
|
||||
console.error('Database connection error:', err);
|
||||
process.exit(1);
|
||||
}
|
||||
console.log('✓ Database connected');
|
||||
});
|
||||
|
||||
//------------------------------------------------------
|
||||
// Routes
|
||||
//------------------------------------------------------
|
||||
|
||||
// Import route modules
|
||||
const sourcesRoutes = require('./routes/sources');
|
||||
const rulesRoutes = require('./routes/rules');
|
||||
const mappingsRoutes = require('./routes/mappings');
|
||||
const recordsRoutes = require('./routes/records');
|
||||
|
||||
// Mount routes
|
||||
app.use('/api/sources', sourcesRoutes(pool));
|
||||
app.use('/api/rules', rulesRoutes(pool));
|
||||
app.use('/api/mappings', mappingsRoutes(pool));
|
||||
app.use('/api/records', recordsRoutes(pool));
|
||||
|
||||
// Health check
|
||||
app.get('/health', (req, res) => {
|
||||
res.json({ status: 'ok', timestamp: new Date().toISOString() });
|
||||
});
|
||||
|
||||
// Root endpoint
|
||||
app.get('/', (req, res) => {
|
||||
res.json({
|
||||
name: 'Dataflow API',
|
||||
version: '0.1.0',
|
||||
endpoints: {
|
||||
sources: '/api/sources',
|
||||
rules: '/api/rules',
|
||||
mappings: '/api/mappings',
|
||||
records: '/api/records',
|
||||
health: '/health'
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Error handler
|
||||
app.use((err, req, res, next) => {
|
||||
console.error('Error:', err);
|
||||
res.status(err.status || 500).json({
|
||||
error: err.message || 'Internal server error',
|
||||
...(process.env.NODE_ENV === 'development' && { stack: err.stack })
|
||||
});
|
||||
});
|
||||
|
||||
// 404 handler
|
||||
app.use((req, res) => {
|
||||
res.status(404).json({ error: 'Endpoint not found' });
|
||||
});
|
||||
|
||||
// Start server
|
||||
app.listen(PORT, () => {
|
||||
console.log(`✓ Dataflow API listening on port ${PORT}`);
|
||||
console.log(` Health: http://localhost:${PORT}/health`);
|
||||
console.log(` API: http://localhost:${PORT}/api/sources`);
|
||||
});
|
||||
|
||||
module.exports = app;
|
||||
231
database/functions.sql
Normal file
231
database/functions.sql
Normal file
@ -0,0 +1,231 @@
|
||||
--
|
||||
-- Dataflow Functions
|
||||
-- Simple, clear functions for import and transformation
|
||||
--
|
||||
|
||||
SET search_path TO dataflow, public;
|
||||
|
||||
------------------------------------------------------
|
||||
-- Function: import_records
|
||||
-- Import data with automatic deduplication
|
||||
------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION import_records(
|
||||
p_source_name TEXT,
|
||||
p_data JSONB -- Array of records
|
||||
) RETURNS JSON AS $$
|
||||
DECLARE
|
||||
v_dedup_fields TEXT[];
|
||||
v_record JSONB;
|
||||
v_dedup_key TEXT;
|
||||
v_inserted INTEGER := 0;
|
||||
v_duplicates INTEGER := 0;
|
||||
v_log_id INTEGER;
|
||||
BEGIN
|
||||
-- Get dedup fields for this source
|
||||
SELECT dedup_fields INTO v_dedup_fields
|
||||
FROM dataflow.sources
|
||||
WHERE name = p_source_name;
|
||||
|
||||
IF v_dedup_fields IS NULL THEN
|
||||
RETURN json_build_object(
|
||||
'success', false,
|
||||
'error', 'Source not found: ' || p_source_name
|
||||
);
|
||||
END IF;
|
||||
|
||||
-- Process each record
|
||||
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data)
|
||||
LOOP
|
||||
-- Generate dedup key
|
||||
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields);
|
||||
|
||||
-- Try to insert (will fail silently if duplicate)
|
||||
BEGIN
|
||||
INSERT INTO dataflow.records (source_name, data, dedup_key)
|
||||
VALUES (p_source_name, v_record, v_dedup_key);
|
||||
|
||||
v_inserted := v_inserted + 1;
|
||||
EXCEPTION WHEN unique_violation THEN
|
||||
v_duplicates := v_duplicates + 1;
|
||||
END;
|
||||
END LOOP;
|
||||
|
||||
-- Log the import
|
||||
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate)
|
||||
VALUES (p_source_name, v_inserted, v_duplicates)
|
||||
RETURNING id INTO v_log_id;
|
||||
|
||||
RETURN json_build_object(
|
||||
'success', true,
|
||||
'imported', v_inserted,
|
||||
'duplicates', v_duplicates,
|
||||
'log_id', v_log_id
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Function: apply_transformations
|
||||
-- Apply all transformation rules to records
|
||||
------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION apply_transformations(
|
||||
p_source_name TEXT,
|
||||
p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed
|
||||
) RETURNS JSON AS $$
|
||||
DECLARE
|
||||
v_record RECORD;
|
||||
v_rule RECORD;
|
||||
v_transformed JSONB;
|
||||
v_extracted TEXT;
|
||||
v_mapping JSONB;
|
||||
v_count INTEGER := 0;
|
||||
BEGIN
|
||||
-- Loop through records to transform
|
||||
FOR v_record IN
|
||||
SELECT id, data
|
||||
FROM dataflow.records
|
||||
WHERE source_name = p_source_name
|
||||
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
|
||||
AND transformed IS NULL
|
||||
LOOP
|
||||
-- Start with original data
|
||||
v_transformed := v_record.data;
|
||||
|
||||
-- Apply each rule in sequence
|
||||
FOR v_rule IN
|
||||
SELECT * FROM dataflow.rules
|
||||
WHERE source_name = p_source_name
|
||||
AND enabled = true
|
||||
ORDER BY sequence
|
||||
LOOP
|
||||
-- Extract value using regex
|
||||
v_extracted := (
|
||||
SELECT substring(v_record.data->>v_rule.field FROM v_rule.pattern)
|
||||
);
|
||||
|
||||
IF v_extracted IS NOT NULL THEN
|
||||
-- Check if there's a mapping for this value
|
||||
SELECT output INTO v_mapping
|
||||
FROM dataflow.mappings
|
||||
WHERE source_name = p_source_name
|
||||
AND rule_name = v_rule.name
|
||||
AND input_value = v_extracted;
|
||||
|
||||
IF v_mapping IS NOT NULL THEN
|
||||
-- Apply mapping (merge mapped fields into result)
|
||||
v_transformed := v_transformed || v_mapping;
|
||||
ELSE
|
||||
-- No mapping, just add extracted value
|
||||
v_transformed := jsonb_set(
|
||||
v_transformed,
|
||||
ARRAY[v_rule.output_field],
|
||||
to_jsonb(v_extracted)
|
||||
);
|
||||
END IF;
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
-- Update record with transformed data
|
||||
UPDATE dataflow.records
|
||||
SET transformed = v_transformed,
|
||||
transformed_at = CURRENT_TIMESTAMP
|
||||
WHERE id = v_record.id;
|
||||
|
||||
v_count := v_count + 1;
|
||||
END LOOP;
|
||||
|
||||
RETURN json_build_object(
|
||||
'success', true,
|
||||
'transformed', v_count
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Function: get_unmapped_values
|
||||
-- Find extracted values that need mappings
|
||||
------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION get_unmapped_values(
|
||||
p_source_name TEXT,
|
||||
p_rule_name TEXT DEFAULT NULL
|
||||
) RETURNS TABLE (
|
||||
rule_name TEXT,
|
||||
output_field TEXT,
|
||||
extracted_value TEXT,
|
||||
record_count BIGINT
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
WITH extracted AS (
|
||||
-- Get all transformed records and extract rule output fields
|
||||
SELECT
|
||||
r.name AS rule_name,
|
||||
r.output_field,
|
||||
rec.transformed->>r.output_field AS extracted_value
|
||||
FROM
|
||||
dataflow.records rec
|
||||
CROSS JOIN dataflow.rules r
|
||||
WHERE
|
||||
rec.source_name = p_source_name
|
||||
AND r.source_name = p_source_name
|
||||
AND rec.transformed IS NOT NULL
|
||||
AND rec.transformed ? r.output_field
|
||||
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
||||
)
|
||||
SELECT
|
||||
e.rule_name,
|
||||
e.output_field,
|
||||
e.extracted_value,
|
||||
count(*) AS record_count
|
||||
FROM extracted e
|
||||
WHERE NOT EXISTS (
|
||||
-- Exclude values that already have mappings
|
||||
SELECT 1 FROM dataflow.mappings m
|
||||
WHERE m.source_name = p_source_name
|
||||
AND m.rule_name = e.rule_name
|
||||
AND m.input_value = e.extracted_value
|
||||
)
|
||||
GROUP BY e.rule_name, e.output_field, e.extracted_value
|
||||
ORDER BY record_count DESC;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Function: reprocess_records
|
||||
-- Clear and reapply transformations
|
||||
------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
|
||||
RETURNS JSON AS $$
|
||||
BEGIN
|
||||
-- Clear existing transformations
|
||||
UPDATE dataflow.records
|
||||
SET transformed = NULL,
|
||||
transformed_at = NULL
|
||||
WHERE source_name = p_source_name;
|
||||
|
||||
-- Reapply transformations
|
||||
RETURN dataflow.apply_transformations(p_source_name);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Summary
|
||||
------------------------------------------------------
|
||||
-- Functions: 4 simple, focused functions
|
||||
-- 1. import_records - Import with deduplication
|
||||
-- 2. apply_transformations - Apply rules and mappings
|
||||
-- 3. get_unmapped_values - Find values needing mappings
|
||||
-- 4. reprocess_records - Re-transform all records
|
||||
--
|
||||
-- Each function does ONE thing clearly
|
||||
-- No complex nested CTEs
|
||||
-- Easy to understand and debug
|
||||
------------------------------------------------------
|
||||
166
database/schema.sql
Normal file
166
database/schema.sql
Normal file
@ -0,0 +1,166 @@
|
||||
--
|
||||
-- Dataflow Database Schema
|
||||
-- Simple, clear structure for data transformation
|
||||
--
|
||||
|
||||
-- Create schema
|
||||
CREATE SCHEMA IF NOT EXISTS dataflow;
|
||||
|
||||
-- Set search path
|
||||
SET search_path TO dataflow, public;
|
||||
|
||||
------------------------------------------------------
|
||||
-- Table: sources
|
||||
-- Defines data sources and how to deduplicate them
|
||||
------------------------------------------------------
|
||||
CREATE TABLE sources (
|
||||
name TEXT PRIMARY KEY,
|
||||
dedup_fields TEXT[] NOT NULL, -- Fields used for deduplication (e.g., ['date', 'amount', 'description'])
|
||||
config JSONB DEFAULT '{}'::jsonb,
|
||||
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
COMMENT ON TABLE sources IS 'Data source definitions';
|
||||
COMMENT ON COLUMN sources.dedup_fields IS 'Array of field names used to identify duplicate records';
|
||||
COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Table: records
|
||||
-- Stores imported data (raw and transformed)
|
||||
------------------------------------------------------
|
||||
CREATE TABLE records (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE,
|
||||
|
||||
-- Data
|
||||
data JSONB NOT NULL, -- Original imported data
|
||||
dedup_key TEXT NOT NULL, -- Hash of dedup fields for fast lookup
|
||||
transformed JSONB, -- Data after transformations applied
|
||||
|
||||
-- Metadata
|
||||
imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
transformed_at TIMESTAMPTZ,
|
||||
|
||||
-- Constraints
|
||||
UNIQUE(source_name, dedup_key) -- Prevent duplicates
|
||||
);
|
||||
|
||||
COMMENT ON TABLE records IS 'Imported records with raw and transformed data';
|
||||
COMMENT ON COLUMN records.data IS 'Original data as imported';
|
||||
COMMENT ON COLUMN records.dedup_key IS 'Hash of deduplication fields for fast duplicate detection';
|
||||
COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules';
|
||||
|
||||
-- Indexes
|
||||
CREATE INDEX idx_records_source ON records(source_name);
|
||||
CREATE INDEX idx_records_dedup ON records(source_name, dedup_key);
|
||||
CREATE INDEX idx_records_data ON records USING gin(data);
|
||||
CREATE INDEX idx_records_transformed ON records USING gin(transformed);
|
||||
|
||||
------------------------------------------------------
|
||||
-- Table: rules
|
||||
-- Transformation rules (regex extraction)
|
||||
------------------------------------------------------
|
||||
CREATE TABLE rules (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
|
||||
-- Rule definition
|
||||
field TEXT NOT NULL, -- Field to extract from (e.g., 'description')
|
||||
pattern TEXT NOT NULL, -- Regex pattern
|
||||
output_field TEXT NOT NULL, -- Name of extracted field (e.g., 'merchant')
|
||||
|
||||
-- Options
|
||||
enabled BOOLEAN DEFAULT true,
|
||||
sequence INTEGER DEFAULT 0, -- Execution order
|
||||
|
||||
-- Metadata
|
||||
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
UNIQUE(source_name, name)
|
||||
);
|
||||
|
||||
COMMENT ON TABLE rules IS 'Transformation rules for extracting data';
|
||||
COMMENT ON COLUMN rules.field IS 'Source field to apply regex to';
|
||||
COMMENT ON COLUMN rules.pattern IS 'Regular expression pattern';
|
||||
COMMENT ON COLUMN rules.output_field IS 'Name of field to store extracted value';
|
||||
|
||||
CREATE INDEX idx_rules_source ON rules(source_name);
|
||||
|
||||
------------------------------------------------------
|
||||
-- Table: mappings
|
||||
-- Value mappings (extracted value → standardized output)
|
||||
------------------------------------------------------
|
||||
CREATE TABLE mappings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE,
|
||||
rule_name TEXT NOT NULL,
|
||||
|
||||
-- Mapping
|
||||
input_value TEXT NOT NULL, -- Extracted value to match
|
||||
output JSONB NOT NULL, -- Standardized output
|
||||
|
||||
-- Metadata
|
||||
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
UNIQUE(source_name, rule_name, input_value),
|
||||
FOREIGN KEY (source_name, rule_name) REFERENCES rules(source_name, name) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
COMMENT ON TABLE mappings IS 'Maps extracted values to standardized output';
|
||||
COMMENT ON COLUMN mappings.input_value IS 'Value extracted by rule';
|
||||
COMMENT ON COLUMN mappings.output IS 'Standardized output (can contain multiple fields)';
|
||||
|
||||
CREATE INDEX idx_mappings_source_rule ON mappings(source_name, rule_name);
|
||||
CREATE INDEX idx_mappings_input ON mappings(source_name, rule_name, input_value);
|
||||
|
||||
------------------------------------------------------
|
||||
-- Table: import_log
|
||||
-- Audit trail of imports
|
||||
------------------------------------------------------
|
||||
CREATE TABLE import_log (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE,
|
||||
records_imported INTEGER DEFAULT 0,
|
||||
records_duplicate INTEGER DEFAULT 0,
|
||||
imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
COMMENT ON TABLE import_log IS 'Audit log of data imports';
|
||||
|
||||
CREATE INDEX idx_import_log_source ON import_log(source_name);
|
||||
CREATE INDEX idx_import_log_timestamp ON import_log(imported_at);
|
||||
|
||||
------------------------------------------------------
|
||||
-- Helper function: Generate dedup key
|
||||
------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION generate_dedup_key(
|
||||
data JSONB,
|
||||
dedup_fields TEXT[]
|
||||
) RETURNS TEXT AS $$
|
||||
DECLARE
|
||||
field TEXT;
|
||||
values TEXT := '';
|
||||
BEGIN
|
||||
-- Concatenate values from dedup fields
|
||||
FOREACH field IN ARRAY dedup_fields LOOP
|
||||
values := values || COALESCE(data->>field, '') || '|';
|
||||
END LOOP;
|
||||
|
||||
-- Return MD5 hash of concatenated values
|
||||
RETURN md5(values);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql IMMUTABLE;
|
||||
|
||||
COMMENT ON FUNCTION generate_dedup_key IS 'Generate hash key from specified fields for deduplication';
|
||||
|
||||
------------------------------------------------------
|
||||
-- Summary
|
||||
------------------------------------------------------
|
||||
-- Tables: 5 (sources, records, rules, mappings, import_log)
|
||||
-- Simple, clear structure
|
||||
-- JSONB for flexibility
|
||||
-- Deduplication via hash key
|
||||
-- All transformations traceable
|
||||
------------------------------------------------------
|
||||
311
examples/GETTING_STARTED.md
Normal file
311
examples/GETTING_STARTED.md
Normal file
@ -0,0 +1,311 @@
|
||||
# Getting Started with Dataflow
|
||||
|
||||
This guide walks through a complete example using bank transaction data.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. PostgreSQL database running
|
||||
2. Database created: `CREATE DATABASE dataflow;`
|
||||
3. `.env` file configured (copy from `.env.example`)
|
||||
|
||||
## Step 1: Deploy Database Schema
|
||||
|
||||
```bash
|
||||
cd /opt/dataflow
|
||||
psql -U postgres -d dataflow -f database/schema.sql
|
||||
psql -U postgres -d dataflow -f database/functions.sql
|
||||
```
|
||||
|
||||
You should see tables created without errors.
|
||||
|
||||
## Step 2: Start the API Server
|
||||
|
||||
```bash
|
||||
npm install
|
||||
npm start
|
||||
```
|
||||
|
||||
The server should start on port 3000 (or your configured port).
|
||||
|
||||
Test it:
|
||||
```bash
|
||||
curl http://localhost:3000/health
|
||||
# Should return: {"status":"ok","timestamp":"..."}
|
||||
```
|
||||
|
||||
## Step 3: Create a Data Source
|
||||
|
||||
A source defines where data comes from and how to deduplicate it.
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/sources \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"name": "bank_transactions",
|
||||
"dedup_fields": ["date", "description", "amount"]
|
||||
}'
|
||||
```
|
||||
|
||||
**What this does:** Records with the same date + description + amount will be considered duplicates.
|
||||
|
||||
## Step 4: Create Transformation Rules
|
||||
|
||||
Rules extract meaningful data using regex patterns.
|
||||
|
||||
### Rule 1: Extract merchant name (first part of description)
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/rules \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"name": "extract_merchant",
|
||||
"field": "description",
|
||||
"pattern": "^([A-Z][A-Z ]+)",
|
||||
"output_field": "merchant",
|
||||
"sequence": 1
|
||||
}'
|
||||
```
|
||||
|
||||
### Rule 2: Extract location (city + state pattern)
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/rules \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"name": "extract_location",
|
||||
"field": "description",
|
||||
"pattern": "([A-Z]+) OH",
|
||||
"output_field": "location",
|
||||
"sequence": 2
|
||||
}'
|
||||
```
|
||||
|
||||
## Step 5: Import Data
|
||||
|
||||
Import the example CSV file:
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/sources/bank_transactions/import \
|
||||
-F "file=@examples/bank_transactions.csv"
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"imported": 14,
|
||||
"duplicates": 0,
|
||||
"log_id": 1
|
||||
}
|
||||
```
|
||||
|
||||
## Step 6: View Imported Records
|
||||
|
||||
```bash
|
||||
curl http://localhost:3000/api/records/source/bank_transactions?limit=5
|
||||
```
|
||||
|
||||
You'll see the raw imported data. Note that `transformed` is `null` - we haven't applied transformations yet!
|
||||
|
||||
## Step 7: Apply Transformations
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/sources/bank_transactions/transform
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"transformed": 14
|
||||
}
|
||||
```
|
||||
|
||||
Now check the records again:
|
||||
```bash
|
||||
curl http://localhost:3000/api/records/source/bank_transactions?limit=2
|
||||
```
|
||||
|
||||
You'll see the `transformed` field now contains the original data plus extracted fields like `merchant` and `location`.
|
||||
|
||||
## Step 8: View Extracted Values That Need Mapping
|
||||
|
||||
```bash
|
||||
curl http://localhost:3000/api/mappings/source/bank_transactions/unmapped
|
||||
```
|
||||
|
||||
Response shows extracted merchant names that aren't mapped yet:
|
||||
```json
|
||||
[
|
||||
{"rule_name": "extract_merchant", "extracted_value": "GOOGLE", "record_count": 2},
|
||||
{"rule_name": "extract_merchant", "extracted_value": "TARGET", "record_count": 2},
|
||||
{"rule_name": "extract_merchant", "extracted_value": "WALMART", "record_count": 1},
|
||||
...
|
||||
]
|
||||
```
|
||||
|
||||
## Step 9: Create Value Mappings
|
||||
|
||||
Map extracted values to clean, standardized output:
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/mappings \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"rule_name": "extract_merchant",
|
||||
"input_value": "GOOGLE",
|
||||
"output": {
|
||||
"vendor": "Google",
|
||||
"category": "Technology"
|
||||
}
|
||||
}'
|
||||
|
||||
curl -X POST http://localhost:3000/api/mappings \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"rule_name": "extract_merchant",
|
||||
"input_value": "TARGET",
|
||||
"output": {
|
||||
"vendor": "Target",
|
||||
"category": "Retail"
|
||||
}
|
||||
}'
|
||||
|
||||
curl -X POST http://localhost:3000/api/mappings \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"rule_name": "extract_merchant",
|
||||
"input_value": "WALMART",
|
||||
"output": {
|
||||
"vendor": "Walmart",
|
||||
"category": "Groceries"
|
||||
}
|
||||
}'
|
||||
```
|
||||
|
||||
## Step 10: Reprocess With Mappings
|
||||
|
||||
Clear and reapply transformations to pick up the new mappings:
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/sources/bank_transactions/reprocess
|
||||
```
|
||||
|
||||
## Step 11: View Final Results
|
||||
|
||||
```bash
|
||||
curl http://localhost:3000/api/records/source/bank_transactions?limit=5
|
||||
```
|
||||
|
||||
Now the `transformed` field contains:
|
||||
- Original fields (date, description, amount, category)
|
||||
- Extracted fields (merchant, location)
|
||||
- Mapped fields (vendor, category from mappings)
|
||||
|
||||
Example result:
|
||||
```json
|
||||
{
|
||||
"id": 1,
|
||||
"data": {
|
||||
"date": "2024-01-02",
|
||||
"description": "GOOGLE *YOUTUBE VIDEOS",
|
||||
"amount": "4.26",
|
||||
"category": "Services"
|
||||
},
|
||||
"transformed": {
|
||||
"date": "2024-01-02",
|
||||
"description": "GOOGLE *YOUTUBE VIDEOS",
|
||||
"amount": "4.26",
|
||||
"category": "Services",
|
||||
"merchant": "GOOGLE",
|
||||
"vendor": "Google",
|
||||
"category": "Technology"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Step 12: Test Deduplication
|
||||
|
||||
Try importing the same file again:
|
||||
|
||||
```bash
|
||||
curl -X POST http://localhost:3000/api/sources/bank_transactions/import \
|
||||
-F "file=@examples/bank_transactions.csv"
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"imported": 0,
|
||||
"duplicates": 14,
|
||||
"log_id": 2
|
||||
}
|
||||
```
|
||||
|
||||
All records were rejected as duplicates! ✓
|
||||
|
||||
## Summary
|
||||
|
||||
You've now:
|
||||
- ✅ Created a data source with deduplication rules
|
||||
- ✅ Defined transformation rules to extract data
|
||||
- ✅ Imported CSV data
|
||||
- ✅ Applied transformations
|
||||
- ✅ Created value mappings for clean output
|
||||
- ✅ Reprocessed data with mappings
|
||||
- ✅ Tested deduplication
|
||||
|
||||
## Next Steps
|
||||
|
||||
- Add more rules for other extraction patterns
|
||||
- Create more value mappings as needed
|
||||
- Query the `transformed` data for reporting
|
||||
- Import additional CSV files
|
||||
|
||||
## Useful Commands
|
||||
|
||||
```bash
|
||||
# View all sources
|
||||
curl http://localhost:3000/api/sources
|
||||
|
||||
# View source statistics
|
||||
curl http://localhost:3000/api/sources/bank_transactions/stats
|
||||
|
||||
# View all rules for a source
|
||||
curl http://localhost:3000/api/rules/source/bank_transactions
|
||||
|
||||
# View all mappings for a source
|
||||
curl http://localhost:3000/api/mappings/source/bank_transactions
|
||||
|
||||
# Search for specific records
|
||||
curl -X POST http://localhost:3000/api/records/search \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"source_name": "bank_transactions",
|
||||
"query": {"vendor": "Google"},
|
||||
"limit": 10
|
||||
}'
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**API won't start:**
|
||||
- Check `.env` file exists with correct database credentials
|
||||
- Verify PostgreSQL is running: `psql -U postgres -l`
|
||||
- Check logs for error messages
|
||||
|
||||
**Import fails:**
|
||||
- Verify source exists: `curl http://localhost:3000/api/sources`
|
||||
- Check CSV format matches expectations
|
||||
- Ensure dedup_fields match CSV column names
|
||||
|
||||
**Transformations not working:**
|
||||
- Check rules exist: `curl http://localhost:3000/api/rules/source/bank_transactions`
|
||||
- Test regex pattern manually
|
||||
- Check records have the specified field
|
||||
15
examples/bank_transactions.csv
Normal file
15
examples/bank_transactions.csv
Normal file
@ -0,0 +1,15 @@
|
||||
date,description,amount,category
|
||||
2024-01-02,GOOGLE *YOUTUBE VIDEOS,4.26,Services
|
||||
2024-01-03,CLEVELAND CLINIC,200.00,Medical
|
||||
2024-01-04,AT&T *PAYMENT,57.14,Services
|
||||
2024-01-05,SUBWAY 00044289255 STOW OH,10.25,Restaurants
|
||||
2024-01-06,TARGET STOW OH,197.90,Merchandise
|
||||
2024-01-09,CIRCLE K 05416 STOW OH,52.99,Gasoline
|
||||
2024-01-13,DISCOUNT DRUG MART 32 STOW OH,26.68,Merchandise
|
||||
2024-01-16,BUFFALO WILD WINGS KENT,63.22,Restaurants
|
||||
2024-01-19,WALMART GROCERY,17.16,Supermarkets
|
||||
2024-01-20,GOOGLE *GOOGLE PLAY,2.12,Services
|
||||
2024-01-20,LOWES OF STOW OH,256.48,Home Improvement
|
||||
2024-01-27,GIANT-EAGLE #4096 STOW OH,67.81,Supermarkets
|
||||
2024-01-29,NETFLIX.COM,14.93,Services
|
||||
2024-01-30,TARGET STOW OH,49.37,Merchandise
|
||||
|
29
package.json
Normal file
29
package.json
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
"name": "dataflow",
|
||||
"version": "0.1.0",
|
||||
"description": "Simple data transformation tool for ingesting, mapping, and transforming data",
|
||||
"main": "api/server.js",
|
||||
"scripts": {
|
||||
"start": "node api/server.js",
|
||||
"dev": "nodemon api/server.js",
|
||||
"test": "echo \"Tests coming soon\" && exit 0"
|
||||
},
|
||||
"keywords": [
|
||||
"etl",
|
||||
"data-transformation",
|
||||
"data-pipeline",
|
||||
"csv-import"
|
||||
],
|
||||
"author": "",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"express": "^4.18.2",
|
||||
"pg": "^8.11.3",
|
||||
"dotenv": "^16.3.1",
|
||||
"multer": "^1.4.5-lts.1",
|
||||
"csv-parse": "^5.5.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"nodemon": "^3.0.1"
|
||||
}
|
||||
}
|
||||
69
setup.sh
Executable file
69
setup.sh
Executable file
@ -0,0 +1,69 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Dataflow Setup Script
|
||||
# Quick setup for development
|
||||
#
|
||||
|
||||
set -e
|
||||
|
||||
echo "🚀 Dataflow Setup"
|
||||
echo "================="
|
||||
echo ""
|
||||
|
||||
# Check if .env exists
|
||||
if [ ! -f .env ]; then
|
||||
echo "📝 Creating .env from template..."
|
||||
cp .env.example .env
|
||||
echo "⚠️ Please edit .env with your database credentials"
|
||||
echo ""
|
||||
read -p "Press enter to continue after editing .env..."
|
||||
fi
|
||||
|
||||
# Load .env
|
||||
export $(cat .env | grep -v '^#' | xargs)
|
||||
|
||||
# Install dependencies
|
||||
echo "📦 Installing Node.js dependencies..."
|
||||
npm install
|
||||
echo ""
|
||||
|
||||
# Test database connection
|
||||
echo "🔍 Testing database connection..."
|
||||
if psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d postgres -c '\q' 2>/dev/null; then
|
||||
echo "✓ PostgreSQL connection successful"
|
||||
else
|
||||
echo "✗ Cannot connect to PostgreSQL"
|
||||
echo " Please check your database credentials in .env"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Create database if it doesn't exist
|
||||
echo ""
|
||||
echo "🗄️ Checking database..."
|
||||
if psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then
|
||||
echo "✓ Database '$DB_NAME' exists"
|
||||
else
|
||||
echo "Creating database '$DB_NAME'..."
|
||||
psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d postgres -c "CREATE DATABASE $DB_NAME;"
|
||||
echo "✓ Database created"
|
||||
fi
|
||||
|
||||
# Deploy schema
|
||||
echo ""
|
||||
echo "📋 Deploying database schema..."
|
||||
psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -f database/schema.sql
|
||||
echo "✓ Schema deployed"
|
||||
|
||||
echo ""
|
||||
echo "⚙️ Deploying database functions..."
|
||||
psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -f database/functions.sql
|
||||
echo "✓ Functions deployed"
|
||||
|
||||
echo ""
|
||||
echo "✅ Setup complete!"
|
||||
echo ""
|
||||
echo "Next steps:"
|
||||
echo " 1. Start the server: npm start"
|
||||
echo " 2. Test the API: curl http://localhost:$API_PORT/health"
|
||||
echo " 3. Follow the guide: examples/GETTING_STARTED.md"
|
||||
echo ""
|
||||
Loading…
Reference in New Issue
Block a user