# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import csv as lib_csv import os import re import sys from collections.abc import Iterator from dataclasses import dataclass from typing import Any, Optional, Union import click from click.core import Context try: from github import BadCredentialsException, Github, PullRequest, Repository except ModuleNotFoundError: print("PyGitHub is a required package for this script") exit(1) SUPERSET_REPO = "apache/superset" SUPERSET_PULL_REQUEST_TYPES = r"^(fix|feat|chore|refactor|docs|build|ci|/gmi)" SUPERSET_RISKY_LABELS = r"^(blocking|risk|hold|revert|security vulnerability)" @dataclass class GitLog: """ Represents a git log entry """ sha: str author: str time: str message: str pr_number: Union[int, None] = None author_email: str = "" def __eq__(self, other: object) -> bool: """A log entry is considered equal if it has the same PR number""" if isinstance(other, self.__class__): return other.pr_number == self.pr_number return False def __repr__(self) -> str: return f"[{self.pr_number}]: {self.message} {self.time} {self.author}" class GitChangeLog: """ Helper class to output a list of logs entries on a superset changelog format We want to map a git author to a github login, for that we call github's API """ def __init__( self, version: str, logs: list[GitLog], access_token: Optional[str] = None, risk: Optional[bool] = False, ) -> None: self._version = version self._logs = logs self._pr_logs_with_details: dict[int, dict[str, Any]] = {} self._github_login_cache: dict[str, Optional[str]] = {} self._github_prs: dict[int, Any] = {} self._wait = 10 github_token = access_token or os.environ.get("GITHUB_TOKEN") self._github = Github(github_token) self._show_risk = risk self._superset_repo: Repository = None def _fetch_github_pr(self, pr_number: int) -> PullRequest: """ Fetches a github PR info """ try: github_repo = self._github.get_repo(SUPERSET_REPO) self._superset_repo = github_repo pull_request = self._github_prs.get(pr_number) if not pull_request: pull_request = github_repo.get_pull(pr_number) self._github_prs[pr_number] = pull_request except BadCredentialsException: print( "Bad credentials to github provided" " use access_token parameter or set GITHUB_TOKEN" ) sys.exit(1) return pull_request def _get_github_login(self, git_log: GitLog) -> Optional[str]: """ Tries to fetch a github login (username) from a git author """ author_name = git_log.author github_login = self._github_login_cache.get(author_name) if github_login: return github_login if git_log.pr_number: pr_info = self._fetch_github_pr(git_log.pr_number) if pr_info: github_login = pr_info.user.login else: github_login = author_name # set cache self._github_login_cache[author_name] = github_login return github_login def _has_commit_migrations(self, git_sha: str) -> bool: commit = self._superset_repo.get_commit(sha=git_sha) return any( "superset/migrations/versions/" in file.filename for file in commit.files ) def _get_pull_request_details(self, git_log: GitLog) -> dict[str, Any]: pr_number = git_log.pr_number if pr_number: detail = self._pr_logs_with_details.get(pr_number) if detail: return detail pr_info = self._fetch_github_pr(pr_number) has_migrations = self._has_commit_migrations(git_log.sha) title = pr_info.title if pr_info else git_log.message pr_type = re.match(SUPERSET_PULL_REQUEST_TYPES, title) if pr_type: pr_type = pr_type.group().strip('"') # type: ignore labels = (" | ").join([label.name for label in pr_info.labels]) is_risky = self._is_risk_pull_request(pr_info.labels) detail = { "id": pr_number, "has_migrations": has_migrations, "labels": labels, "title": title, "type": pr_type, "is_risky": is_risky or has_migrations, } if pr_number: self._pr_logs_with_details[pr_number] = detail return detail def _is_risk_pull_request(self, labels: list[Any]) -> bool: for label in labels: risk_label = re.match(SUPERSET_RISKY_LABELS, label.name) if risk_label is not None: return True return False def _get_changelog_version_head(self) -> str: if not len(self._logs): print( "No changes found between revisions. " "Make sure your branch is up to date." ) sys.exit(1) return f"### {self._version} ({self._logs[0].time})" def _parse_change_log( self, changelog: dict[str, str], pr_info: dict[str, str], github_login: str, ) -> None: formatted_pr = ( f"- [#{pr_info.get('id')}]" f"(https://github.com/{SUPERSET_REPO}/pull/{pr_info.get('id')}) " f"{pr_info.get('title')} (@{github_login})\n" ) if pr_info.get("has_migrations"): changelog["Database Migrations"] += formatted_pr elif pr_info.get("type") == "fix": changelog["Fixes"] += formatted_pr elif pr_info.get("type") == "feat": changelog["Features"] += formatted_pr else: changelog["Others"] += formatted_pr def __repr__(self) -> str: result = f"\n{self._get_changelog_version_head()}\n" changelog = { "Database Migrations": "\n", "Features": "\n", "Fixes": "\n", "Others": "\n", } for i, log in enumerate(self._logs): github_login = self._get_github_login(log) pr_info = self._get_pull_request_details(log) if not github_login: github_login = log.author if self._show_risk: if pr_info.get("is_risky"): result += ( f"- [#{log.pr_number}]" f"(https://github.com/{SUPERSET_REPO}/pull/{log.pr_number}) " f"{pr_info.get('title')} (@{github_login}) " f"{pr_info.get('labels')} \n" ) else: self._parse_change_log(changelog, pr_info, github_login) print(f"\r {i}/{len(self._logs)}", end="", flush=True) if self._show_risk: return result for key in changelog: result += f"**{key}** {changelog[key]}\n" return result def __iter__(self) -> Iterator[dict[str, Any]]: for log in self._logs: yield { "pr_number": log.pr_number, "pr_link": f"https://github.com/{SUPERSET_REPO}/pull/" f"{log.pr_number}", "message": log.message, "time": log.time, "author": log.author, "email": log.author_email, "sha": log.sha, } class GitLogs: """ Manages git log entries from a specific branch/tag Can compare git log entries by PR number """ def __init__(self, git_ref: str) -> None: self._git_ref = git_ref self._logs: list[GitLog] = [] @property def git_ref(self) -> str: return self._git_ref @property def logs(self) -> list[GitLog]: return self._logs def fetch(self) -> None: self._logs = list(map(self._parse_log, self._git_logs()))[::-1] def diff(self, git_logs: "GitLogs") -> list[GitLog]: return [log for log in git_logs.logs if log not in self._logs] def __repr__(self) -> str: return f"{self._git_ref}, Log count:{len(self._logs)}" @staticmethod def _git_get_current_head() -> str: output = os.popen("git status | head -1").read() match = re.match("(?:HEAD detached at|On branch) (.*)", output) if not match: return "" return match.group(1) def _git_checkout(self, git_ref: str) -> None: os.popen(f"git checkout {git_ref}").read() current_head = self._git_get_current_head() if current_head != git_ref: print(f"Could not checkout {git_ref}") sys.exit(1) def _git_logs(self) -> list[str]: # let's get current git ref so we can revert it back current_git_ref = self._git_get_current_head() self._git_checkout(self._git_ref) output = ( os.popen('git --no-pager log --pretty=format:"%h|%an|%ae|%ad|%s|"') .read() .split("\n") ) # revert to git ref, let's be nice self._git_checkout(current_git_ref) return output @staticmethod def _parse_log(log_item: str) -> GitLog: pr_number = None split_log_item = log_item.split("|") # parse the PR number from the log message match = re.match(r".*\(\#(\d*)\)", split_log_item[4]) if match: pr_number = int(match.group(1)) return GitLog( sha=split_log_item[0], author=split_log_item[1], author_email=split_log_item[2], time=split_log_item[3], message=split_log_item[4], pr_number=pr_number, ) @dataclass class BaseParameters: previous_logs: GitLogs current_logs: GitLogs def print_title(message: str) -> None: print(f"{50*'-'}") print(message) print(f"{50*'-'}") @click.group() @click.pass_context @click.option("--previous_version", help="The previous release version", required=True) @click.option("--current_version", help="The current release version", required=True) def cli(ctx: Context, previous_version: str, current_version: str) -> None: """Welcome to change log generator""" previous_logs = GitLogs(previous_version) current_logs = GitLogs(current_version) previous_logs.fetch() current_logs.fetch() base_parameters = BaseParameters(previous_logs, current_logs) ctx.obj = base_parameters @cli.command("compare") @click.pass_obj def compare(base_parameters: BaseParameters) -> None: """Compares both versions (by PR)""" previous_logs = base_parameters.previous_logs current_logs = base_parameters.current_logs print_title( f"Pull requests from " f"{current_logs.git_ref} not in {previous_logs.git_ref}" ) previous_diff_logs = previous_logs.diff(current_logs) for diff_log in previous_diff_logs: print(f"{diff_log}") print_title( f"Pull requests from " f"{previous_logs.git_ref} not in {current_logs.git_ref}" ) current_diff_logs = current_logs.diff(previous_logs) for diff_log in current_diff_logs: print(f"{diff_log}") @cli.command("changelog") @click.option( "--csv", help="The csv filename to export the changelog to", ) @click.option( "--access_token", help="The github access token," " if not provided will try to fetch from GITHUB_TOKEN env var", ) @click.option("--risk", is_flag=True, help="show all pull requests with risky labels") @click.pass_obj def change_log( base_parameters: BaseParameters, csv: str, access_token: str, risk: bool ) -> None: """Outputs a changelog (by PR)""" previous_logs = base_parameters.previous_logs current_logs = base_parameters.current_logs previous_diff_logs = previous_logs.diff(current_logs) logs = GitChangeLog( current_logs.git_ref, previous_diff_logs[::-1], access_token=access_token, risk=risk, ) if csv: with open(csv, "w") as csv_file: log_items = list(logs) field_names = log_items[0].keys() writer = lib_csv.DictWriter( csv_file, delimiter=",", quotechar='"', quoting=lib_csv.QUOTE_ALL, fieldnames=field_names, ) writer.writeheader() for log in logs: writer.writerow(log) else: print("Fetching github usernames, this may take a while:") print(logs) cli()