diff --git a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py index 19903b94c4..bffa983c2c 100644 --- a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py +++ b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py @@ -22,12 +22,18 @@ Create Date: 2020-09-28 17:57:23.128142 """ import json -import logging -import uuid +import os +import time +from json.decoder import JSONDecodeError +from uuid import uuid4 import sqlalchemy as sa from alembic import op +from sqlalchemy.dialects.mysql.base import MySQLDialect +from sqlalchemy.dialects.postgresql.base import PGDialect +from sqlalchemy.exc import OperationalError from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import load_only from sqlalchemy_utils import UUIDType from superset import db @@ -43,7 +49,7 @@ Base = declarative_base() class ImportMixin: id = sa.Column(sa.Integer, primary_key=True) - uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid.uuid4) + uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid4) table_names = [ @@ -71,26 +77,56 @@ models = { models["dashboards"].position_json = sa.Column(utils.MediumText()) +default_batch_size = int(os.environ.get("BATCH_SIZE", 200)) -def add_uuids(objects, session, batch_size=100): - uuid_map = {} - count = len(objects) - for i, object_ in enumerate(objects): - object_.uuid = uuid.uuid4() - uuid_map[object_.id] = object_.uuid - session.merge(object_) - if (i + 1) % batch_size == 0: - session.commit() - print(f"uuid assigned to {i + 1} out of {count}") +# Add uuids directly using built-in SQL uuid function +add_uuids_by_dialect = { + MySQLDialect: """UPDATE %s SET uuid = UNHEX(REPLACE(uuid(), "-", ""));""", + PGDialect: """UPDATE %s SET uuid = uuid_in(md5(random()::text || clock_timestamp()::text)::cstring);""", +} - session.commit() - print(f"Done! Assigned {count} uuids") - return uuid_map +def add_uuids(table_name, session, batch_size=default_batch_size): + """Populate columns with pre-computed uuids""" + bind = op.get_bind() + objects_query = session.query(models[table_name]) + count = objects_query.count() + + # silently skip if the table is empty (suitable for db initialization) + if count == 0: + return + + print(f"\nAdding uuids for `{table_name}`...") + start_time = time.time() + + # Use dialect specific native SQL queries if possible + for dialect, sql in add_uuids_by_dialect.items(): + if isinstance(bind.dialect, dialect): + op.execute(sql % table_name) + print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.") + return + + # Othwewise Use Python uuid function + start = 0 + while start < count: + end = min(start + batch_size, count) + for obj, uuid in map(lambda obj: (obj, uuid4()), objects_query[start:end]): + obj.uuid = uuid + session.merge(obj) + session.commit() + if start + batch_size < count: + print(f" uuid assigned to {end} out of {count}\r", end="") + start += batch_size + + print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.") def update_position_json(dashboard, session, uuid_map): - layout = json.loads(dashboard.position_json or "{}") + try: + layout = json.loads(dashboard.position_json or "{}") + except JSONDecodeError: + layout = {} + for object_ in layout.values(): if ( isinstance(object_, dict) @@ -105,37 +141,63 @@ def update_position_json(dashboard, session, uuid_map): dashboard.position_json = json.dumps(layout, indent=4) session.merge(dashboard) + + +def update_dashboards(session, uuid_map): + message = ( + "Updating dasboard position json with slice uuid.." + if uuid_map + else "Cleaning up slice uuid from dashboard position json.." + ) + print(f"\n{message}\r", end="") + + query = session.query(models["dashboards"]) + dashboard_count = query.count() + for i, dashboard in enumerate(query.all()): + update_position_json(dashboard, session, uuid_map) + if i and i % default_batch_size == 0: + session.commit() + print(f"{message} {i+1}/{dashboard_count}\r", end="") + session.commit() + # Extra whitespace to override very long numbers, e.g. 99999/99999. + print(f"{message} Done. \n") def upgrade(): bind = op.get_bind() session = db.Session(bind=bind) - uuid_maps = {} - for table_name, model in models.items(): - with op.batch_alter_table(table_name) as batch_op: - batch_op.add_column( - sa.Column( - "uuid", - UUIDType(binary=True), - primary_key=False, - default=uuid.uuid4, + for table_name in models.keys(): + try: + with op.batch_alter_table(table_name) as batch_op: + batch_op.add_column( + sa.Column( + "uuid", UUIDType(binary=True), primary_key=False, default=uuid4, + ), ) - ) + except OperationalError: + # ignore collumn update errors so that we can run upgrade multiple times + pass - # populate column - objects = session.query(model).all() - uuid_maps[table_name] = add_uuids(objects, session) + add_uuids(table_name, session) - # add uniqueness constraint - with op.batch_alter_table(table_name) as batch_op: - batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"]) + try: + # add uniqueness constraint + with op.batch_alter_table(table_name) as batch_op: + # batch mode is required for sqllite + batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"]) + except OperationalError: + pass # add UUID to Dashboard.position_json - Dashboard = models["dashboards"] - for dashboard in session.query(Dashboard).all(): - update_position_json(dashboard, session, uuid_maps["slices"]) + slice_uuid_map = { + slc.id: slc.uuid + for slc in session.query(models["slices"]) + .options(load_only("id", "uuid")) + .all() + } + update_dashboards(session, slice_uuid_map) def downgrade(): @@ -143,12 +205,10 @@ def downgrade(): session = db.Session(bind=bind) # remove uuid from position_json - Dashboard = models["dashboards"] - for dashboard in session.query(Dashboard).all(): - update_position_json(dashboard, session, {}) + update_dashboards(session, {}) # remove uuid column for table_name, model in models.items(): - with op.batch_alter_table(model) as batch_op: - batch_op.drop_constraint(f"uq_{table_name}_uuid") + with op.batch_alter_table(table_name) as batch_op: + batch_op.drop_constraint(f"uq_{table_name}_uuid", type_="unique") batch_op.drop_column("uuid")