perf: speed up uuid column generation (#11209)

This commit is contained in:
Jesse Yang 2020-10-13 13:29:49 -07:00 committed by GitHub
parent 6d8541e3b6
commit d7eb1d476f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 101 additions and 41 deletions

View File

@ -22,12 +22,18 @@ Create Date: 2020-09-28 17:57:23.128142
"""
import json
import logging
import uuid
import os
import time
from json.decoder import JSONDecodeError
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.mysql.base import MySQLDialect
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import load_only
from sqlalchemy_utils import UUIDType
from superset import db
@ -43,7 +49,7 @@ Base = declarative_base()
class ImportMixin:
id = sa.Column(sa.Integer, primary_key=True)
uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid.uuid4)
uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid4)
table_names = [
@ -71,26 +77,56 @@ models = {
models["dashboards"].position_json = sa.Column(utils.MediumText())
default_batch_size = int(os.environ.get("BATCH_SIZE", 200))
def add_uuids(objects, session, batch_size=100):
uuid_map = {}
count = len(objects)
for i, object_ in enumerate(objects):
object_.uuid = uuid.uuid4()
uuid_map[object_.id] = object_.uuid
session.merge(object_)
if (i + 1) % batch_size == 0:
session.commit()
print(f"uuid assigned to {i + 1} out of {count}")
# Add uuids directly using built-in SQL uuid function
add_uuids_by_dialect = {
MySQLDialect: """UPDATE %s SET uuid = UNHEX(REPLACE(uuid(), "-", ""));""",
PGDialect: """UPDATE %s SET uuid = uuid_in(md5(random()::text || clock_timestamp()::text)::cstring);""",
}
session.commit()
print(f"Done! Assigned {count} uuids")
return uuid_map
def add_uuids(table_name, session, batch_size=default_batch_size):
"""Populate columns with pre-computed uuids"""
bind = op.get_bind()
objects_query = session.query(models[table_name])
count = objects_query.count()
# silently skip if the table is empty (suitable for db initialization)
if count == 0:
return
print(f"\nAdding uuids for `{table_name}`...")
start_time = time.time()
# Use dialect specific native SQL queries if possible
for dialect, sql in add_uuids_by_dialect.items():
if isinstance(bind.dialect, dialect):
op.execute(sql % table_name)
print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
return
# Othwewise Use Python uuid function
start = 0
while start < count:
end = min(start + batch_size, count)
for obj, uuid in map(lambda obj: (obj, uuid4()), objects_query[start:end]):
obj.uuid = uuid
session.merge(obj)
session.commit()
if start + batch_size < count:
print(f" uuid assigned to {end} out of {count}\r", end="")
start += batch_size
print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
def update_position_json(dashboard, session, uuid_map):
layout = json.loads(dashboard.position_json or "{}")
try:
layout = json.loads(dashboard.position_json or "{}")
except JSONDecodeError:
layout = {}
for object_ in layout.values():
if (
isinstance(object_, dict)
@ -105,37 +141,63 @@ def update_position_json(dashboard, session, uuid_map):
dashboard.position_json = json.dumps(layout, indent=4)
session.merge(dashboard)
def update_dashboards(session, uuid_map):
message = (
"Updating dasboard position json with slice uuid.."
if uuid_map
else "Cleaning up slice uuid from dashboard position json.."
)
print(f"\n{message}\r", end="")
query = session.query(models["dashboards"])
dashboard_count = query.count()
for i, dashboard in enumerate(query.all()):
update_position_json(dashboard, session, uuid_map)
if i and i % default_batch_size == 0:
session.commit()
print(f"{message} {i+1}/{dashboard_count}\r", end="")
session.commit()
# Extra whitespace to override very long numbers, e.g. 99999/99999.
print(f"{message} Done. \n")
def upgrade():
bind = op.get_bind()
session = db.Session(bind=bind)
uuid_maps = {}
for table_name, model in models.items():
with op.batch_alter_table(table_name) as batch_op:
batch_op.add_column(
sa.Column(
"uuid",
UUIDType(binary=True),
primary_key=False,
default=uuid.uuid4,
for table_name in models.keys():
try:
with op.batch_alter_table(table_name) as batch_op:
batch_op.add_column(
sa.Column(
"uuid", UUIDType(binary=True), primary_key=False, default=uuid4,
),
)
)
except OperationalError:
# ignore collumn update errors so that we can run upgrade multiple times
pass
# populate column
objects = session.query(model).all()
uuid_maps[table_name] = add_uuids(objects, session)
add_uuids(table_name, session)
# add uniqueness constraint
with op.batch_alter_table(table_name) as batch_op:
batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
try:
# add uniqueness constraint
with op.batch_alter_table(table_name) as batch_op:
# batch mode is required for sqllite
batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
except OperationalError:
pass
# add UUID to Dashboard.position_json
Dashboard = models["dashboards"]
for dashboard in session.query(Dashboard).all():
update_position_json(dashboard, session, uuid_maps["slices"])
slice_uuid_map = {
slc.id: slc.uuid
for slc in session.query(models["slices"])
.options(load_only("id", "uuid"))
.all()
}
update_dashboards(session, slice_uuid_map)
def downgrade():
@ -143,12 +205,10 @@ def downgrade():
session = db.Session(bind=bind)
# remove uuid from position_json
Dashboard = models["dashboards"]
for dashboard in session.query(Dashboard).all():
update_position_json(dashboard, session, {})
update_dashboards(session, {})
# remove uuid column
for table_name, model in models.items():
with op.batch_alter_table(model) as batch_op:
batch_op.drop_constraint(f"uq_{table_name}_uuid")
with op.batch_alter_table(table_name) as batch_op:
batch_op.drop_constraint(f"uq_{table_name}_uuid", type_="unique")
batch_op.drop_column("uuid")