#!/usr/bin/env python3 """ Dataflow Manager Interactive tool for configuring, deploying, and managing the dataflow service. """ import os import sys import subprocess import getpass import shutil from pathlib import Path from datetime import datetime ROOT = Path(__file__).parent.resolve() ENV_FILE = ROOT / '.env' SERVICE_FILE = Path('/etc/systemd/system/dataflow.service') SERVICE_SRC = ROOT / 'dataflow.service' NGINX_DIR = Path('/etc/nginx/sites-enabled') # ── Terminal helpers ────────────────────────────────────────────────────────── BOLD = '\033[1m' DIM = '\033[2m' GREEN = '\033[0;32m' YELLOW = '\033[0;33m' RED = '\033[0;31m' CYAN = '\033[0;36m' RESET = '\033[0m' def bold(s): return f'{BOLD}{s}{RESET}' def dim(s): return f'{DIM}{s}{RESET}' def green(s): return f'{GREEN}{s}{RESET}' def yellow(s): return f'{YELLOW}{s}{RESET}' def red(s): return f'{RED}{s}{RESET}' def cyan(s): return f'{CYAN}{s}{RESET}' def header(title): print(f'\n{BOLD}── {title} ──{RESET}') def ok(msg=''): print(f' {green("✓")} {msg}' if msg else f' {green("✓")}') def warn(msg): print(f' {yellow("!")} {msg}') def err(msg): print(f' {red("✗")} {msg}') def info(msg): print(f' {dim(msg)}') def prompt(label, default=None, secret=False): suffix = f' [{default}]' if default else '' text = f' {label}{suffix}: ' if secret: val = getpass.getpass(text) else: val = input(text).strip() return val if val else (default or '') def confirm(label, default_yes=True): hint = '[Y/n]' if default_yes else '[y/N]' val = input(f' {label} {hint}: ').strip().lower() if not val: return default_yes return val.startswith('y') def pause(): input(f'\n {dim("Press Enter to continue...")}') # ── Env file ────────────────────────────────────────────────────────────────── def load_env(): env = {} if ENV_FILE.exists(): for line in ENV_FILE.read_text().splitlines(): line = line.strip() if line and not line.startswith('#') and '=' in line: k, _, v = line.partition('=') env[k.strip()] = v.strip() return env def write_env(cfg): content = f"""# Database Configuration DB_HOST={cfg['DB_HOST']} DB_PORT={cfg['DB_PORT']} DB_NAME={cfg['DB_NAME']} DB_USER={cfg['DB_USER']} DB_PASSWORD={cfg['DB_PASSWORD']} # API Configuration API_PORT={cfg.get('API_PORT', '3020')} NODE_ENV={cfg.get('NODE_ENV', 'production')} """ ENV_FILE.write_text(content) # ── Database helpers ────────────────────────────────────────────────────────── def psql_env(cfg): e = os.environ.copy() e['PGPASSWORD'] = cfg['DB_PASSWORD'] return e def psql_run(cfg, sql, db=None, check=False): db = db or cfg['DB_NAME'] cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], '-p', str(cfg['DB_PORT']), '-d', db, '-tAc', sql] return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) def psql_admin(admin_cfg, sql, db='postgres'): e = os.environ.copy() e['PGPASSWORD'] = admin_cfg['password'] cmd = ['psql', '-U', admin_cfg['user'], '-h', admin_cfg['host'], '-p', str(admin_cfg['port']), '-d', db, '-c', sql] return subprocess.run(cmd, capture_output=True, text=True, env=e) def psql_file(cfg, filepath, db=None): db = db or cfg['DB_NAME'] cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], '-p', str(cfg['DB_PORT']), '-d', db, '-f', str(filepath), '-q'] return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) def can_connect(cfg): r = psql_run(cfg, 'SELECT 1', db=cfg['DB_NAME']) return r.returncode == 0 def schema_deployed(cfg): r = psql_run(cfg, "SELECT 1 FROM information_schema.schemata WHERE schema_name='dataflow'") return r.returncode == 0 and '1' in r.stdout def functions_deployed(cfg): r = psql_run(cfg, "SELECT 1 FROM pg_proc WHERE proname='apply_transformations'") return r.returncode == 0 and '1' in r.stdout # ── System helpers ──────────────────────────────────────────────────────────── def service_installed(): return SERVICE_FILE.exists() def service_running(): r = subprocess.run(['systemctl', 'is-active', 'dataflow'], capture_output=True, text=True) return r.stdout.strip() == 'active' def ui_built(): index = ROOT / 'public' / 'index.html' return index.exists() def ui_build_time(): index = ROOT / 'public' / 'index.html' if index.exists(): ts = index.stat().st_mtime return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M') return None def nginx_domain(port): """Find nginx site proxying to our port.""" if not NGINX_DIR.exists(): return None for f in NGINX_DIR.iterdir(): try: text = f.read_text() if f':{port}' in text: for line in text.splitlines(): if 'server_name' in line: parts = line.split() if len(parts) >= 2: return parts[1].rstrip(';') except Exception: pass return None def sudo_run(args, **kwargs): return subprocess.run(['sudo'] + args, **kwargs) # ── Status ──────────────────────────────────────────────────────────────────── def show_status(cfg): header('Status') if not cfg: warn('No .env — not configured') return port = cfg.get('API_PORT', '3020') # Database connected = can_connect(cfg) db_label = f"{cfg['DB_USER']}@{cfg['DB_HOST']}:{cfg['DB_PORT']}/{cfg['DB_NAME']}" status = green('connected') if connected else red('cannot connect') print(f' Database {dim(db_label)} {status}') if connected: sd = schema_deployed(cfg) fn = functions_deployed(cfg) print(f' Schema {green("deployed") if sd else red("not deployed")}') print(f' Functions {green("deployed") if fn else red("not deployed")}') else: print(f' Schema {dim("unknown")}') print(f' Functions {dim("unknown")}') # UI if ui_built(): print(f' UI {green("built")} {dim(ui_build_time())}') else: print(f' UI {red("not built")}') # Service if service_installed(): running = service_running() print(f' Service {green("running") if running else yellow("stopped")}') else: print(f' Service {dim("not installed")}') # Nginx domain = nginx_domain(port) if domain: print(f' Nginx {green(domain)}') else: print(f' Nginx {dim("not configured")}') print() # ── Actions ─────────────────────────────────────────────────────────────────── def action_configure(cfg): """Set up or reconfigure .env — handles new and existing databases.""" header('Configure Database') existing = cfg.copy() if cfg else {} print(' Configure connection to an existing database, or create a new one.') print() new_cfg = {} new_cfg['DB_HOST'] = prompt('Host', existing.get('DB_HOST', 'localhost')) new_cfg['DB_PORT'] = prompt('Port', existing.get('DB_PORT', '5432')) new_cfg['DB_NAME'] = prompt('Database', existing.get('DB_NAME', 'dataflow')) new_cfg['DB_USER'] = prompt('User', existing.get('DB_USER', 'dataflow')) new_cfg['DB_PASSWORD'] = prompt('Password', existing.get('DB_PASSWORD', ''), secret=True) new_cfg['API_PORT'] = prompt('API port', existing.get('API_PORT', '3020')) new_cfg['NODE_ENV'] = prompt('Environment', existing.get('NODE_ENV', 'production')) print() print(f' Testing connection to {new_cfg["DB_NAME"]}...') if can_connect(new_cfg): ok('Connected') else: warn('Cannot connect with those credentials.') if not confirm('Create user/database using admin credentials?', default_yes=False): info('Cancelled — .env not written') return cfg print() admin = {} admin['user'] = prompt('Admin username', 'postgres') admin['password'] = prompt('Admin password', secret=True) admin['host'] = new_cfg['DB_HOST'] admin['port'] = new_cfg['DB_PORT'] # Test admin connection r = psql_admin(admin, 'SELECT 1') if r.returncode != 0: err(f'Cannot connect as {admin["user"]}') return cfg # Create user r = psql_admin(admin, f"SELECT 1 FROM pg_roles WHERE rolname='{new_cfg['DB_USER']}'") if '1' in r.stdout: info(f'User {new_cfg["DB_USER"]} already exists') else: r = psql_admin(admin, f"CREATE USER {new_cfg['DB_USER']} WITH PASSWORD '{new_cfg['DB_PASSWORD']}'") if r.returncode == 0: ok(f'User {new_cfg["DB_USER"]} created') else: err(f'Could not create user: {r.stderr.strip()}') return cfg # Create or grant access to database r = psql_admin(admin, f"SELECT 1 FROM pg_database WHERE datname='{new_cfg['DB_NAME']}'") if '1' in r.stdout: info(f'Database {new_cfg["DB_NAME"]} already exists — granting access') psql_admin(admin, f"GRANT CREATE ON DATABASE {new_cfg['DB_NAME']} TO {new_cfg['DB_USER']}", db=new_cfg['DB_NAME']) else: r = psql_admin(admin, f"CREATE DATABASE {new_cfg['DB_NAME']} OWNER {new_cfg['DB_USER']}") if r.returncode == 0: ok(f'Database {new_cfg["DB_NAME"]} created') else: err(f'Could not create database: {r.stderr.strip()}') return cfg if not can_connect(new_cfg): err('Still cannot connect after setup — check credentials') return cfg ok('Connection verified') write_env(new_cfg) ok('.env written') return new_cfg def action_deploy_schema(cfg): header('Deploy Schema') if not cfg: err('No .env — run Configure first') return if not can_connect(cfg): err('Cannot connect to database') return if schema_deployed(cfg): warn('Schema already deployed') if not confirm('Redeploy (this will reset all data)?', default_yes=False): info('Cancelled') return print(f' Deploying schema to {cfg["DB_NAME"]}...') r = psql_file(cfg, ROOT / 'database' / 'schema.sql') if r.returncode == 0: ok('Schema deployed') else: err(f'Failed:\n{r.stderr}') def action_deploy_functions(cfg): header('Deploy Functions') if not cfg: err('No .env — run Configure first') return if not can_connect(cfg): err('Cannot connect to database') return print(f' Deploying functions to {cfg["DB_NAME"]}...') r = psql_file(cfg, ROOT / 'database' / 'functions.sql') if r.returncode == 0: ok('Functions deployed') else: err(f'Failed:\n{r.stderr}') def action_build_ui(): header('Build UI') ui_dir = ROOT / 'ui' if not (ui_dir / 'package.json').exists(): err('ui/package.json not found') return print(' Building...') r = subprocess.run(['npm', 'run', 'build'], cwd=ui_dir, capture_output=True, text=True) if r.returncode == 0: ok('UI built') else: err(f'Build failed:\n{r.stderr}') def action_setup_nginx(cfg): header('Nginx') if not shutil.which('nginx'): err('nginx not found') return port = cfg.get('API_PORT', '3020') if cfg else '3020' domain = prompt('Domain (e.g. dataflow.example.com)') if not domain: info('No domain — skipped') return conf_name = domain.split('.')[0] conf_path = NGINX_DIR / conf_name cert_path = Path(f'/etc/letsencrypt/live/{domain}/fullchain.pem') if cert_path.exists(): conf = f"""server {{ listen 80; listen [::]:80; server_name {domain}; location / {{ return 301 https://$host$request_uri; }} }} server {{ listen 443 ssl http2; listen [::]:443 ssl http2; server_name {domain}; ssl_certificate {cert_path}; ssl_certificate_key /etc/letsencrypt/live/{domain}/privkey.pem; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!MEDIUM:!LOW:!aNULL:!NULL:!SHA; ssl_prefer_server_ciphers on; ssl_session_cache shared:SSL:10m; keepalive_timeout 70; sendfile on; client_max_body_size 80m; location / {{ proxy_pass http://localhost:{port}; }} }} """ else: conf = f"""server {{ listen 80; listen [::]:80; server_name {domain}; location / {{ proxy_pass http://localhost:{port}; }} }} """ # Write via sudo import tempfile with tempfile.NamedTemporaryFile('w', suffix='.conf', delete=False) as f: f.write(conf) tmp = f.name r = sudo_run(['cp', tmp, str(conf_path)]) os.unlink(tmp) if r.returncode != 0: err('Could not write nginx config (check sudo)') return ok(f'Config written to {conf_path}') r = sudo_run(['nginx', '-t'], capture_output=True) if r.returncode != 0: err('nginx config invalid — run: sudo nginx -t') return ok('Config valid') sudo_run(['systemctl', 'reload', 'nginx']) ok('nginx reloaded') if not cert_path.exists(): warn(f'No SSL cert for {domain}') if confirm('Run certbot now?'): r = sudo_run(['certbot', '--nginx', '-d', domain, '--non-interactive', '--agree-tos', '--redirect', '-m', f'admin@{domain}']) if r.returncode == 0: ok('SSL configured') else: err(f'certbot failed — run manually: sudo certbot --nginx -d {domain}') def action_install_service(): header('Systemd Service') if service_installed(): info('Already installed') return if not SERVICE_SRC.exists(): err(f'{SERVICE_SRC} not found') return r = sudo_run(['cp', str(SERVICE_SRC), str(SERVICE_FILE)]) if r.returncode != 0: err('Could not install service (check sudo)') return ok('Service file installed') sudo_run(['systemctl', 'daemon-reload']) sudo_run(['systemctl', 'enable', 'dataflow'], capture_output=True) ok('Enabled on boot') def action_restart_service(): header('API Server') if not service_installed(): err('Service not installed — run Install Service first') return action = 'restart' if service_running() else 'start' r = sudo_run(['systemctl', action, 'dataflow']) if r.returncode != 0: err(f'Failed — check: journalctl -u dataflow -n 30') return import time; time.sleep(1) if service_running(): ok(f'Service {action}ed and running') else: err('Service failed to start — check: journalctl -u dataflow -n 30') def action_stop_service(): header('Stop Service') if not service_running(): info('Service is not running') return sudo_run(['systemctl', 'stop', 'dataflow']) ok('Service stopped') # ── Main menu ───────────────────────────────────────────────────────────────── MENU = [ ('Configure database', action_configure), ('Deploy schema', action_deploy_schema), ('Deploy functions', action_deploy_functions), ('Build UI', action_build_ui), ('Set up nginx', action_setup_nginx), ('Install service', action_install_service), ('Start / restart service', action_restart_service), ('Stop service', action_stop_service), ] def main(): while True: os.system('clear') print(bold('Dataflow Manager')) print('=' * 40) cfg = load_env() show_status(cfg) print(bold('Actions')) for i, (label, _) in enumerate(MENU, 1): print(f' {cyan(str(i))}. {label}') print(f' {cyan("q")}. Quit') print() choice = input(' Choice: ').strip().lower() if choice in ('q', 'quit', 'exit'): print() break try: idx = int(choice) - 1 if 0 <= idx < len(MENU): label, fn = MENU[idx] # Some actions need cfg, some don't import inspect sig = inspect.signature(fn) if len(sig.parameters) == 0: result = fn() elif len(sig.parameters) == 1: result = fn(cfg) # configure returns updated cfg if label.startswith('Configure') and result is not None: cfg = result pause() else: warn('Invalid choice') except (ValueError, IndexError): warn('Invalid choice') if __name__ == '__main__': main()