From 88a89c9fd683b50d8a81754199fba6dbb4c7bef3 Mon Sep 17 00:00:00 2001 From: John Bodley <4567245+john-bodley@users.noreply.github.com> Date: Wed, 12 Oct 2022 17:11:59 -0700 Subject: [PATCH] fix(migration): Ensure the paginated update is deterministic (#21778) --- superset/migrations/shared/utils.py | 31 ++++++++++++------- ...b8bca906d2_permalink_rename_filterstate.py | 3 -- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/superset/migrations/shared/utils.py b/superset/migrations/shared/utils.py index 14987ea0b4..e05b1d357f 100644 --- a/superset/migrations/shared/utils.py +++ b/superset/migrations/shared/utils.py @@ -100,22 +100,31 @@ def paginated_update( """ Update models in small batches so we don't have to load everything in memory. """ - start = 0 - count = query.count() + + total = query.count() + processed = 0 session: Session = inspect(query).session + result = session.execute(query) + if print_page_progress is None or print_page_progress is True: - print_page_progress = lambda current, total: print( - f" {current}/{total}", end="\r" + print_page_progress = lambda processed, total: print( + f" {processed}/{total}", end="\r" ) - while start < count: - end = min(start + batch_size, count) - for obj in query[start:end]: - yield obj - session.merge(obj) + + while True: + rows = result.fetchmany(batch_size) + + if not rows: + break + + for row in rows: + yield row[0] + session.commit() + processed += len(rows) + if print_page_progress: - print_page_progress(end, count) - start += batch_size + print_page_progress(processed, total) def try_load_json(data: Optional[str]) -> Dict[str, Any]: diff --git a/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py b/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py index ecd424d12a..0b76404dc9 100644 --- a/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py +++ b/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py @@ -66,7 +66,6 @@ def upgrade(): state["anchor"] = state["hash"] del state["hash"] entry.value = pickle.dumps(value) - session.commit() def downgrade(): @@ -87,5 +86,3 @@ def downgrade(): state["hash"] = state["anchor"] del state["anchor"] entry.value = pickle.dumps(value) - session.merge(entry) - session.commit()