dataflow/manage.py
Paul Trowbridge a26a7643e4 Clarify manage.py configure step wording
Menu item: 'Configure database' → 'Configure database connection'
Intro text: disambiguates that the database is what gets created, not a connection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 16:26:31 -04:00

548 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Dataflow Manager
Interactive tool for configuring, deploying, and managing the dataflow service.
"""
import os
import sys
import subprocess
import getpass
import shutil
from pathlib import Path
from datetime import datetime
ROOT = Path(__file__).parent.resolve()
ENV_FILE = ROOT / '.env'
SERVICE_FILE = Path('/etc/systemd/system/dataflow.service')
SERVICE_SRC = ROOT / 'dataflow.service'
NGINX_DIR = Path('/etc/nginx/sites-enabled')
# ── Terminal helpers ──────────────────────────────────────────────────────────
BOLD = '\033[1m'
DIM = '\033[2m'
GREEN = '\033[0;32m'
YELLOW = '\033[0;33m'
RED = '\033[0;31m'
CYAN = '\033[0;36m'
RESET = '\033[0m'
def bold(s): return f'{BOLD}{s}{RESET}'
def dim(s): return f'{DIM}{s}{RESET}'
def green(s): return f'{GREEN}{s}{RESET}'
def yellow(s): return f'{YELLOW}{s}{RESET}'
def red(s): return f'{RED}{s}{RESET}'
def cyan(s): return f'{CYAN}{s}{RESET}'
def header(title):
print(f'\n{BOLD}── {title} ──{RESET}')
def ok(msg=''): print(f' {green("")} {msg}' if msg else f' {green("")}')
def warn(msg): print(f' {yellow("!")} {msg}')
def err(msg): print(f' {red("")} {msg}')
def info(msg): print(f' {dim(msg)}')
def prompt(label, default=None, secret=False):
suffix = f' [{default}]' if default else ''
text = f' {label}{suffix}: '
if secret:
val = getpass.getpass(text)
else:
val = input(text).strip()
return val if val else (default or '')
def confirm(label, default_yes=True):
hint = '[Y/n]' if default_yes else '[y/N]'
val = input(f' {label} {hint}: ').strip().lower()
if not val:
return default_yes
return val.startswith('y')
def pause():
input(f'\n {dim("Press Enter to continue...")}')
# ── Env file ──────────────────────────────────────────────────────────────────
def load_env():
env = {}
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, _, v = line.partition('=')
env[k.strip()] = v.strip()
return env
def write_env(cfg):
content = f"""# Database Configuration
DB_HOST={cfg['DB_HOST']}
DB_PORT={cfg['DB_PORT']}
DB_NAME={cfg['DB_NAME']}
DB_USER={cfg['DB_USER']}
DB_PASSWORD={cfg['DB_PASSWORD']}
# API Configuration
API_PORT={cfg.get('API_PORT', '3020')}
NODE_ENV={cfg.get('NODE_ENV', 'production')}
"""
ENV_FILE.write_text(content)
# ── Database helpers ──────────────────────────────────────────────────────────
def psql_env(cfg):
e = os.environ.copy()
e['PGPASSWORD'] = cfg['DB_PASSWORD']
return e
def psql_run(cfg, sql, db=None, check=False):
db = db or cfg['DB_NAME']
cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'],
'-p', str(cfg['DB_PORT']), '-d', db, '-tAc', sql]
return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg))
def psql_admin(admin_cfg, sql, db='postgres'):
e = os.environ.copy()
e['PGPASSWORD'] = admin_cfg['password']
cmd = ['psql', '-U', admin_cfg['user'], '-h', admin_cfg['host'],
'-p', str(admin_cfg['port']), '-d', db, '-c', sql]
return subprocess.run(cmd, capture_output=True, text=True, env=e)
def psql_file(cfg, filepath, db=None):
db = db or cfg['DB_NAME']
cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'],
'-p', str(cfg['DB_PORT']), '-d', db, '-f', str(filepath), '-q']
return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg))
def can_connect(cfg):
r = psql_run(cfg, 'SELECT 1', db=cfg['DB_NAME'])
return r.returncode == 0
def schema_deployed(cfg):
r = psql_run(cfg, "SELECT 1 FROM information_schema.schemata WHERE schema_name='dataflow'")
return r.returncode == 0 and '1' in r.stdout
def functions_deployed(cfg):
r = psql_run(cfg, "SELECT 1 FROM pg_proc WHERE proname='apply_transformations'")
return r.returncode == 0 and '1' in r.stdout
# ── System helpers ────────────────────────────────────────────────────────────
def service_installed():
return SERVICE_FILE.exists()
def service_running():
r = subprocess.run(['systemctl', 'is-active', 'dataflow'],
capture_output=True, text=True)
return r.stdout.strip() == 'active'
def ui_built():
index = ROOT / 'public' / 'index.html'
return index.exists()
def ui_build_time():
index = ROOT / 'public' / 'index.html'
if index.exists():
ts = index.stat().st_mtime
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
return None
def nginx_domain(port):
"""Find nginx site proxying to our port."""
if not NGINX_DIR.exists():
return None
for f in NGINX_DIR.iterdir():
try:
text = f.read_text()
if f':{port}' in text:
for line in text.splitlines():
if 'server_name' in line:
parts = line.split()
if len(parts) >= 2:
return parts[1].rstrip(';')
except Exception:
pass
return None
def sudo_run(args, **kwargs):
return subprocess.run(['sudo'] + args, **kwargs)
# ── Status ────────────────────────────────────────────────────────────────────
def show_status(cfg):
header('Status')
if not cfg:
warn('No .env — not configured')
return
port = cfg.get('API_PORT', '3020')
# Database
connected = can_connect(cfg)
db_label = f"{cfg['DB_USER']}@{cfg['DB_HOST']}:{cfg['DB_PORT']}/{cfg['DB_NAME']}"
status = green('connected') if connected else red('cannot connect')
print(f' Database {dim(db_label)} {status}')
if connected:
sd = schema_deployed(cfg)
fn = functions_deployed(cfg)
print(f' Schema {green("deployed") if sd else red("not deployed")}')
print(f' Functions {green("deployed") if fn else red("not deployed")}')
else:
print(f' Schema {dim("unknown")}')
print(f' Functions {dim("unknown")}')
# UI
if ui_built():
print(f' UI {green("built")} {dim(ui_build_time())}')
else:
print(f' UI {red("not built")}')
# Service
if service_installed():
running = service_running()
print(f' Service {green("running") if running else yellow("stopped")}')
else:
print(f' Service {dim("not installed")}')
# Nginx
domain = nginx_domain(port)
if domain:
print(f' Nginx {green(domain)}')
else:
print(f' Nginx {dim("not configured")}')
print()
# ── Actions ───────────────────────────────────────────────────────────────────
def action_configure(cfg):
"""Set up or reconfigure .env — handles new and existing databases."""
header('Configure Database')
existing = cfg.copy() if cfg else {}
print(' Enter connection details. If the database does not exist, you will be')
print(' prompted for admin credentials to create it.')
print()
new_cfg = {}
new_cfg['DB_HOST'] = prompt('Host', existing.get('DB_HOST', 'localhost'))
new_cfg['DB_PORT'] = prompt('Port', existing.get('DB_PORT', '5432'))
new_cfg['DB_NAME'] = prompt('Database', existing.get('DB_NAME', 'dataflow'))
new_cfg['DB_USER'] = prompt('User', existing.get('DB_USER', 'dataflow'))
new_cfg['DB_PASSWORD'] = prompt('Password', existing.get('DB_PASSWORD', ''), secret=True)
new_cfg['API_PORT'] = prompt('API port', existing.get('API_PORT', '3020'))
new_cfg['NODE_ENV'] = prompt('Environment', existing.get('NODE_ENV', 'production'))
print()
print(f' Testing connection to {new_cfg["DB_NAME"]}...')
if can_connect(new_cfg):
ok('Connected')
else:
warn('Cannot connect with those credentials.')
if not confirm('Create user/database using admin credentials?', default_yes=False):
info('Cancelled — .env not written')
return cfg
print()
admin = {}
admin['user'] = prompt('Admin username', 'postgres')
admin['password'] = prompt('Admin password', secret=True)
admin['host'] = new_cfg['DB_HOST']
admin['port'] = new_cfg['DB_PORT']
# Test admin connection
r = psql_admin(admin, 'SELECT 1')
if r.returncode != 0:
err(f'Cannot connect as {admin["user"]}')
return cfg
# Create user
r = psql_admin(admin, f"SELECT 1 FROM pg_roles WHERE rolname='{new_cfg['DB_USER']}'")
if '1' in r.stdout:
info(f'User {new_cfg["DB_USER"]} already exists')
else:
r = psql_admin(admin, f"CREATE USER {new_cfg['DB_USER']} WITH PASSWORD '{new_cfg['DB_PASSWORD']}'")
if r.returncode == 0:
ok(f'User {new_cfg["DB_USER"]} created')
else:
err(f'Could not create user: {r.stderr.strip()}')
return cfg
# Create or grant access to database
r = psql_admin(admin, f"SELECT 1 FROM pg_database WHERE datname='{new_cfg['DB_NAME']}'")
if '1' in r.stdout:
info(f'Database {new_cfg["DB_NAME"]} already exists — granting access')
psql_admin(admin, f"GRANT CREATE ON DATABASE {new_cfg['DB_NAME']} TO {new_cfg['DB_USER']}", db=new_cfg['DB_NAME'])
else:
r = psql_admin(admin, f"CREATE DATABASE {new_cfg['DB_NAME']} OWNER {new_cfg['DB_USER']}")
if r.returncode == 0:
ok(f'Database {new_cfg["DB_NAME"]} created')
else:
err(f'Could not create database: {r.stderr.strip()}')
return cfg
if not can_connect(new_cfg):
err('Still cannot connect after setup — check credentials')
return cfg
ok('Connection verified')
write_env(new_cfg)
ok('.env written')
return new_cfg
def action_deploy_schema(cfg):
header('Deploy Schema')
if not cfg:
err('No .env — run Configure first')
return
if not can_connect(cfg):
err('Cannot connect to database')
return
if schema_deployed(cfg):
warn('Schema already deployed')
if not confirm('Redeploy (this will reset all data)?', default_yes=False):
info('Cancelled')
return
print(f' Deploying schema to {cfg["DB_NAME"]}...')
r = psql_file(cfg, ROOT / 'database' / 'schema.sql')
if r.returncode == 0:
ok('Schema deployed')
else:
err(f'Failed:\n{r.stderr}')
def action_deploy_functions(cfg):
header('Deploy Functions')
if not cfg:
err('No .env — run Configure first')
return
if not can_connect(cfg):
err('Cannot connect to database')
return
print(f' Deploying functions to {cfg["DB_NAME"]}...')
r = psql_file(cfg, ROOT / 'database' / 'functions.sql')
if r.returncode == 0:
ok('Functions deployed')
else:
err(f'Failed:\n{r.stderr}')
def action_build_ui():
header('Build UI')
ui_dir = ROOT / 'ui'
if not (ui_dir / 'package.json').exists():
err('ui/package.json not found')
return
print(' Building...')
r = subprocess.run(['npm', 'run', 'build'], cwd=ui_dir,
capture_output=True, text=True)
if r.returncode == 0:
ok('UI built')
else:
err(f'Build failed:\n{r.stderr}')
def action_setup_nginx(cfg):
header('Nginx')
if not shutil.which('nginx'):
err('nginx not found')
return
port = cfg.get('API_PORT', '3020') if cfg else '3020'
domain = prompt('Domain (e.g. dataflow.example.com)')
if not domain:
info('No domain — skipped')
return
conf_name = domain.split('.')[0]
conf_path = NGINX_DIR / conf_name
cert_path = Path(f'/etc/letsencrypt/live/{domain}/fullchain.pem')
if cert_path.exists():
conf = f"""server {{
listen 80;
listen [::]:80;
server_name {domain};
location / {{ return 301 https://$host$request_uri; }}
}}
server {{
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name {domain};
ssl_certificate {cert_path};
ssl_certificate_key /etc/letsencrypt/live/{domain}/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!MEDIUM:!LOW:!aNULL:!NULL:!SHA;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
keepalive_timeout 70;
sendfile on;
client_max_body_size 80m;
location / {{
proxy_pass http://localhost:{port};
}}
}}
"""
else:
conf = f"""server {{
listen 80;
listen [::]:80;
server_name {domain};
location / {{
proxy_pass http://localhost:{port};
}}
}}
"""
# Write via sudo
import tempfile
with tempfile.NamedTemporaryFile('w', suffix='.conf', delete=False) as f:
f.write(conf)
tmp = f.name
r = sudo_run(['cp', tmp, str(conf_path)])
os.unlink(tmp)
if r.returncode != 0:
err('Could not write nginx config (check sudo)')
return
ok(f'Config written to {conf_path}')
r = sudo_run(['nginx', '-t'], capture_output=True)
if r.returncode != 0:
err('nginx config invalid — run: sudo nginx -t')
return
ok('Config valid')
sudo_run(['systemctl', 'reload', 'nginx'])
ok('nginx reloaded')
if not cert_path.exists():
warn(f'No SSL cert for {domain}')
if confirm('Run certbot now?'):
r = sudo_run(['certbot', '--nginx', '-d', domain,
'--non-interactive', '--agree-tos', '--redirect',
'-m', f'admin@{domain}'])
if r.returncode == 0:
ok('SSL configured')
else:
err(f'certbot failed — run manually: sudo certbot --nginx -d {domain}')
def action_install_service():
header('Systemd Service')
if service_installed():
info('Already installed')
return
if not SERVICE_SRC.exists():
err(f'{SERVICE_SRC} not found')
return
r = sudo_run(['cp', str(SERVICE_SRC), str(SERVICE_FILE)])
if r.returncode != 0:
err('Could not install service (check sudo)')
return
ok('Service file installed')
sudo_run(['systemctl', 'daemon-reload'])
sudo_run(['systemctl', 'enable', 'dataflow'], capture_output=True)
ok('Enabled on boot')
def action_restart_service():
header('API Server')
if not service_installed():
err('Service not installed — run Install Service first')
return
action = 'restart' if service_running() else 'start'
r = sudo_run(['systemctl', action, 'dataflow'])
if r.returncode != 0:
err(f'Failed — check: journalctl -u dataflow -n 30')
return
import time; time.sleep(1)
if service_running():
ok(f'Service {action}ed and running')
else:
err('Service failed to start — check: journalctl -u dataflow -n 30')
def action_stop_service():
header('Stop Service')
if not service_running():
info('Service is not running')
return
sudo_run(['systemctl', 'stop', 'dataflow'])
ok('Service stopped')
# ── Main menu ─────────────────────────────────────────────────────────────────
MENU = [
('Configure database connection', action_configure),
('Deploy schema', action_deploy_schema),
('Deploy functions', action_deploy_functions),
('Build UI', action_build_ui),
('Set up nginx', action_setup_nginx),
('Install service', action_install_service),
('Start / restart service', action_restart_service),
('Stop service', action_stop_service),
]
def main():
while True:
os.system('clear')
print(bold('Dataflow Manager'))
print('=' * 40)
cfg = load_env()
show_status(cfg)
print(bold('Actions'))
for i, (label, _) in enumerate(MENU, 1):
print(f' {cyan(str(i))}. {label}')
print(f' {cyan("q")}. Quit')
print()
choice = input(' Choice: ').strip().lower()
if choice in ('q', 'quit', 'exit'):
print()
break
try:
idx = int(choice) - 1
if 0 <= idx < len(MENU):
label, fn = MENU[idx]
# Some actions need cfg, some don't
import inspect
sig = inspect.signature(fn)
if len(sig.parameters) == 0:
result = fn()
elif len(sig.parameters) == 1:
result = fn(cfg)
# configure returns updated cfg
if label.startswith('Configure') and result is not None:
cfg = result
pause()
else:
warn('Invalid choice')
except (ValueError, IndexError):
warn('Invalid choice')
if __name__ == '__main__':
main()