- pivot_layouts table (source_name, layout_name, config JSONB) - list/save/delete SQL functions and API routes - Pivot toolbar above viewer: layout chips, save-as inline input, delete per layout, reset to default - Applying a named layout also updates localStorage working state - Layouts reload on source change Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
224 lines
9.5 KiB
PL/PgSQL
224 lines
9.5 KiB
PL/PgSQL
--
|
|
-- Sources queries
|
|
-- All SQL for api/routes/sources.js
|
|
--
|
|
|
|
SET search_path TO dataflow, public;
|
|
|
|
-- ── CRUD ─────────────────────────────────────────────────────────────────────
|
|
|
|
CREATE OR REPLACE FUNCTION list_sources()
|
|
RETURNS SETOF dataflow.sources AS $$
|
|
SELECT * FROM dataflow.sources ORDER BY name;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
CREATE OR REPLACE FUNCTION get_source(p_name TEXT)
|
|
RETURNS dataflow.sources AS $$
|
|
SELECT * FROM dataflow.sources WHERE name = p_name;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_constraint_fields TEXT[], p_config JSONB DEFAULT '{}', p_global_picklist BOOLEAN DEFAULT true)
|
|
RETURNS dataflow.sources AS $$
|
|
INSERT INTO dataflow.sources (name, constraint_fields, config, global_picklist)
|
|
VALUES (p_name, p_constraint_fields, p_config, p_global_picklist)
|
|
RETURNING *;
|
|
$$ LANGUAGE sql;
|
|
|
|
CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_constraint_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL, p_global_picklist BOOLEAN DEFAULT NULL)
|
|
RETURNS dataflow.sources AS $$
|
|
UPDATE dataflow.sources
|
|
SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
|
|
config = COALESCE(p_config, config),
|
|
global_picklist = COALESCE(p_global_picklist, global_picklist),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE name = p_name
|
|
RETURNING *;
|
|
$$ LANGUAGE sql;
|
|
|
|
CREATE OR REPLACE FUNCTION delete_source(p_name TEXT)
|
|
RETURNS TEXT AS $$
|
|
DELETE FROM dataflow.sources WHERE name = p_name RETURNING name;
|
|
$$ LANGUAGE sql;
|
|
|
|
-- ── Import log ────────────────────────────────────────────────────────────────
|
|
|
|
-- ── Stats ─────────────────────────────────────────────────────────────────────
|
|
|
|
CREATE OR REPLACE FUNCTION get_source_stats(p_source_name TEXT)
|
|
RETURNS TABLE (total_records BIGINT, transformed_records BIGINT, pending_records BIGINT) AS $$
|
|
SELECT
|
|
COUNT(*) AS total_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records
|
|
FROM dataflow.records
|
|
WHERE source_name = p_source_name;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
-- ── Fields ────────────────────────────────────────────────────────────────────
|
|
|
|
CREATE OR REPLACE FUNCTION get_source_fields(p_source_name TEXT)
|
|
RETURNS TABLE (key TEXT, origins TEXT[]) AS $$
|
|
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
|
|
FROM (
|
|
SELECT f->>'name' AS key, 'schema' AS origin
|
|
FROM dataflow.sources, jsonb_array_elements(config->'fields') f
|
|
WHERE name = p_source_name AND config ? 'fields'
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
|
|
FROM dataflow.records WHERE source_name = p_source_name
|
|
UNION ALL
|
|
SELECT output_field AS key, 'rule: ' || name AS origin
|
|
FROM dataflow.rules WHERE source_name = p_source_name
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
|
|
FROM dataflow.mappings WHERE source_name = p_source_name
|
|
) keys
|
|
GROUP BY key
|
|
ORDER BY key;
|
|
$$ LANGUAGE sql STABLE;
|
|
|
|
-- ── View data (dynamic sort via EXECUTE) ──────────────────────────────────────
|
|
|
|
CREATE OR REPLACE FUNCTION get_view_data(
|
|
p_source_name TEXT,
|
|
p_limit INT DEFAULT 100,
|
|
p_offset INT DEFAULT 0,
|
|
p_sort_col TEXT DEFAULT NULL,
|
|
p_sort_dir TEXT DEFAULT 'asc',
|
|
p_filters JSONB DEFAULT NULL -- [{col, pattern}, ...] — postgres regex (~*)
|
|
)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_exists BOOLEAN;
|
|
v_where TEXT := '';
|
|
v_order TEXT := '';
|
|
v_rows JSON;
|
|
v_filter JSONB;
|
|
v_col TEXT;
|
|
v_pattern TEXT;
|
|
BEGIN
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.views
|
|
WHERE table_schema = 'dfv' AND table_name = p_source_name
|
|
) INTO v_exists;
|
|
|
|
IF NOT v_exists THEN
|
|
RETURN json_build_object('exists', FALSE, 'rows', '[]'::json);
|
|
END IF;
|
|
|
|
-- Build WHERE from filters (validate each column exists in the view)
|
|
IF p_filters IS NOT NULL THEN
|
|
FOR v_filter IN SELECT value FROM jsonb_array_elements(p_filters) LOOP
|
|
v_col := v_filter->>'col';
|
|
v_pattern := v_filter->>'pattern';
|
|
IF v_pattern IS NOT NULL AND v_pattern <> '' AND EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = p_source_name
|
|
AND column_name = v_col
|
|
) THEN
|
|
v_where := v_where ||
|
|
CASE WHEN v_where = '' THEN ' WHERE ' ELSE ' AND ' END ||
|
|
quote_ident(v_col) || '::text ~* ' || quote_literal(v_pattern);
|
|
END IF;
|
|
END LOOP;
|
|
END IF;
|
|
|
|
IF p_sort_col IS NOT NULL AND EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = p_source_name
|
|
AND column_name = p_sort_col
|
|
) THEN
|
|
v_order := ' ORDER BY ' || quote_ident(p_sort_col)
|
|
|| CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END
|
|
|| ' NULLS LAST';
|
|
END IF;
|
|
|
|
EXECUTE format(
|
|
'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s%s LIMIT %s OFFSET %s) t',
|
|
p_source_name, v_where, v_order, p_limit, p_offset
|
|
) INTO v_rows;
|
|
|
|
RETURN json_build_object('exists', TRUE, 'rows', v_rows);
|
|
END;
|
|
$$ LANGUAGE plpgsql STABLE;
|
|
|
|
-- ── View generation ───────────────────────────────────────────────────────────
|
|
|
|
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_config JSONB;
|
|
v_field JSONB;
|
|
v_cols TEXT := '';
|
|
v_sql TEXT;
|
|
v_view TEXT;
|
|
BEGIN
|
|
SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name;
|
|
|
|
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
|
|
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
|
|
END IF;
|
|
|
|
FOR v_field IN SELECT * FROM jsonb_array_elements(v_config->'fields') LOOP
|
|
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
|
|
|
|
IF v_field->>'expression' IS NOT NULL THEN
|
|
DECLARE
|
|
v_expr TEXT := v_field->>'expression';
|
|
v_ref TEXT;
|
|
BEGIN
|
|
WHILE v_expr ~ '\{[^}]+\}' LOOP
|
|
v_ref := substring(v_expr FROM '\{([^}]+)\}');
|
|
v_expr := replace(v_expr, '{' || v_ref || '}', format('(transformed->>%L)::numeric', v_ref));
|
|
END LOOP;
|
|
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
|
|
END;
|
|
ELSE
|
|
CASE v_field->>'type'
|
|
WHEN 'date' THEN v_cols := v_cols || format('(transformed->>%L)::date AS %I', v_field->>'name', v_field->>'name');
|
|
WHEN 'numeric' THEN v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', v_field->>'name', v_field->>'name');
|
|
ELSE v_cols := v_cols || format('transformed->>%L AS %I', v_field->>'name', v_field->>'name');
|
|
END CASE;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
CREATE SCHEMA IF NOT EXISTS dfv;
|
|
v_view := 'dfv.' || quote_ident(p_source_name);
|
|
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL',
|
|
v_view, v_cols, p_source_name
|
|
);
|
|
EXECUTE v_sql;
|
|
|
|
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- List saved pivot layouts for a source
|
|
CREATE OR REPLACE FUNCTION list_pivot_layouts(p_source_name TEXT)
|
|
RETURNS TABLE(id INT, source_name TEXT, layout_name TEXT, config JSONB, created_at TIMESTAMPTZ) AS $$
|
|
SELECT id, source_name, layout_name, config, created_at
|
|
FROM dataflow.pivot_layouts
|
|
WHERE source_name = p_source_name
|
|
ORDER BY layout_name;
|
|
$$ LANGUAGE sql;
|
|
|
|
-- Save (upsert) a named pivot layout
|
|
CREATE OR REPLACE FUNCTION save_pivot_layout(p_source_name TEXT, p_layout_name TEXT, p_config JSONB)
|
|
RETURNS TABLE(id INT, source_name TEXT, layout_name TEXT, config JSONB, created_at TIMESTAMPTZ) AS $$
|
|
INSERT INTO dataflow.pivot_layouts (source_name, layout_name, config)
|
|
VALUES (p_source_name, p_layout_name, p_config)
|
|
ON CONFLICT (source_name, layout_name) DO UPDATE
|
|
SET config = EXCLUDED.config
|
|
RETURNING id, source_name, layout_name, config, created_at;
|
|
$$ LANGUAGE sql;
|
|
|
|
-- Delete a named pivot layout
|
|
CREATE OR REPLACE FUNCTION delete_pivot_layout(p_id INT)
|
|
RETURNS TABLE(id INT) AS $$
|
|
DELETE FROM dataflow.pivot_layouts WHERE id = p_id RETURNING id;
|
|
$$ LANGUAGE sql;
|