fix(migration): Ensure the paginated update is deterministic (#21778)

This commit is contained in:
John Bodley 2022-10-12 17:11:59 -07:00 committed by GitHub
parent 11d7d6e078
commit 88a89c9fd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 14 deletions

View File

@ -100,22 +100,31 @@ def paginated_update(
""" """
Update models in small batches so we don't have to load everything in memory. Update models in small batches so we don't have to load everything in memory.
""" """
start = 0
count = query.count() total = query.count()
processed = 0
session: Session = inspect(query).session session: Session = inspect(query).session
result = session.execute(query)
if print_page_progress is None or print_page_progress is True: if print_page_progress is None or print_page_progress is True:
print_page_progress = lambda current, total: print( print_page_progress = lambda processed, total: print(
f" {current}/{total}", end="\r" f" {processed}/{total}", end="\r"
) )
while start < count:
end = min(start + batch_size, count) while True:
for obj in query[start:end]: rows = result.fetchmany(batch_size)
yield obj
session.merge(obj) if not rows:
break
for row in rows:
yield row[0]
session.commit() session.commit()
processed += len(rows)
if print_page_progress: if print_page_progress:
print_page_progress(end, count) print_page_progress(processed, total)
start += batch_size
def try_load_json(data: Optional[str]) -> Dict[str, Any]: def try_load_json(data: Optional[str]) -> Dict[str, Any]:

View File

@ -66,7 +66,6 @@ def upgrade():
state["anchor"] = state["hash"] state["anchor"] = state["hash"]
del state["hash"] del state["hash"]
entry.value = pickle.dumps(value) entry.value = pickle.dumps(value)
session.commit()
def downgrade(): def downgrade():
@ -87,5 +86,3 @@ def downgrade():
state["hash"] = state["anchor"] state["hash"] = state["anchor"]
del state["anchor"] del state["anchor"]
entry.value = pickle.dumps(value) entry.value = pickle.dumps(value)
session.merge(entry)
session.commit()