Skip to content

[stubsabot] Support "Removal" PRs #14401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 16, 2025
1 change: 1 addition & 0 deletions lib/ts_utils/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
PYPROJECT_PATH: Final = TS_BASE_PATH / "pyproject.toml"
REQUIREMENTS_PATH: Final = TS_BASE_PATH / "requirements-tests.txt"
GITIGNORE_PATH: Final = TS_BASE_PATH / ".gitignore"
PYRIGHT_CONFIG: Final = TS_BASE_PATH / "pyrightconfig.stricter.json"

TESTS_DIR: Final = "@tests"
TEST_CASES_DIR: Final = "test_cases"
Expand Down
4 changes: 1 addition & 3 deletions scripts/create_baseline_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import aiohttp
import termcolor

from ts_utils.paths import STDLIB_PATH, STUBS_PATH

PYRIGHT_CONFIG = Path("pyrightconfig.stricter.json")
from ts_utils.paths import PYRIGHT_CONFIG, STDLIB_PATH, STUBS_PATH


def search_pip_freeze_output(project: str, output: str) -> tuple[str, str] | None:
Expand Down
211 changes: 181 additions & 30 deletions scripts/stubsabot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,46 @@

import argparse
import asyncio
import calendar
import contextlib
import datetime
import enum
import functools
import io
import os
import re
import shutil
import subprocess
import sys
import tarfile
import textwrap
import urllib.parse
import zipfile
from collections.abc import Iterator, Mapping, Sequence
from collections.abc import Callable, Iterator, Mapping, Sequence
from dataclasses import dataclass, field
from http import HTTPStatus
from pathlib import Path
from typing import Annotated, Any, ClassVar, NamedTuple
from typing import Annotated, Any, ClassVar, NamedTuple, TypeVar
from typing_extensions import Self, TypeAlias

import aiohttp
import packaging.version
import tomli
import tomlkit
from packaging.specifiers import Specifier
from termcolor import colored
from tomlkit.items import String

from ts_utils.metadata import StubMetadata, read_metadata, update_metadata
from ts_utils.paths import STUBS_PATH, distribution_path
from ts_utils.metadata import NoSuchStubError, StubMetadata, metadata_path, read_metadata, update_metadata
from ts_utils.paths import PYRIGHT_CONFIG, STUBS_PATH, distribution_path

TYPESHED_OWNER = "python"
TYPESHED_API_URL = f"https://api.github.com/repos/{TYPESHED_OWNER}/typeshed"

STUBSABOT_LABEL = "bot: stubsabot"

POLICY_MONTHS_DELTA = 6


class ActionLevel(enum.IntEnum):
def __new__(cls, value: int, doc: str) -> Self:
Expand Down Expand Up @@ -149,6 +155,16 @@ def __str__(self) -> str:
return f"Marking {self.distribution} as obsolete since {self.obsolete_since_version!r}"


@dataclass
class Remove:
distribution: str
reason: str
links: dict[str, str]

def __str__(self) -> str:
return f"Removing {self.distribution} as {self.reason}"


@dataclass
class NoUpdate:
distribution: str
Expand All @@ -158,6 +174,38 @@ def __str__(self) -> str:
return f"Skipping {self.distribution}: {self.reason}"


_T = TypeVar("_T")


async def with_extracted_archive(
release_to_download: PypiReleaseDownload,
*,
session: aiohttp.ClientSession,
handler: Callable[[zipfile.ZipFile | tarfile.TarFile], _T],
) -> _T:
async with session.get(release_to_download.url) as response:
body = io.BytesIO(await response.read())

packagetype = release_to_download.packagetype
if packagetype == "bdist_wheel":
assert release_to_download.filename.endswith(".whl")
with zipfile.ZipFile(body) as zf:
return handler(zf)
elif packagetype == "sdist":
# sdist defaults to `.tar.gz` on Lunix and to `.zip` on Windows:
# https://docs.python.org/3.11/distutils/sourcedist.html
if release_to_download.filename.endswith(".tar.gz"):
with tarfile.open(fileobj=body, mode="r:gz") as zf:
return handler(zf)
elif release_to_download.filename.endswith(".zip"):
with zipfile.ZipFile(body) as zf:
return handler(zf)
else:
raise AssertionError(f"Package file {release_to_download.filename!r} does not end with '.tar.gz' or '.zip'")
else:
raise AssertionError(f"Unknown package type for {release_to_download.distribution}: {packagetype!r}")


def all_py_files_in_source_are_in_py_typed_dirs(source: zipfile.ZipFile | tarfile.TarFile) -> bool:
py_typed_dirs: list[Path] = []
all_python_files: list[Path] = []
Expand Down Expand Up @@ -207,27 +255,7 @@ def all_py_files_in_source_are_in_py_typed_dirs(source: zipfile.ZipFile | tarfil


async def release_contains_py_typed(release_to_download: PypiReleaseDownload, *, session: aiohttp.ClientSession) -> bool:
async with session.get(release_to_download.url) as response:
body = io.BytesIO(await response.read())

packagetype = release_to_download.packagetype
if packagetype == "bdist_wheel":
assert release_to_download.filename.endswith(".whl")
with zipfile.ZipFile(body) as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
elif packagetype == "sdist":
# sdist defaults to `.tar.gz` on Lunix and to `.zip` on Windows:
# https://docs.python.org/3.11/distutils/sourcedist.html
if release_to_download.filename.endswith(".tar.gz"):
with tarfile.open(fileobj=body, mode="r:gz") as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
elif release_to_download.filename.endswith(".zip"):
with zipfile.ZipFile(body) as zf:
return all_py_files_in_source_are_in_py_typed_dirs(zf)
else:
raise AssertionError(f"Package file {release_to_download.filename!r} does not end with '.tar.gz' or '.zip'")
else:
raise AssertionError(f"Unknown package type for {release_to_download.distribution}: {packagetype!r}")
return await with_extracted_archive(release_to_download, session=session, handler=all_py_files_in_source_are_in_py_typed_dirs)


async def find_first_release_with_py_typed(pypi_info: PypiInfo, *, session: aiohttp.ClientSession) -> PypiReleaseDownload | None:
Expand Down Expand Up @@ -470,12 +498,94 @@ async def analyze_diff(
return DiffAnalysis(py_files=py_files, py_files_stubbed_in_typeshed=py_files_stubbed_in_typeshed)


async def determine_action(distribution: str, session: aiohttp.ClientSession) -> Update | NoUpdate | Obsolete:
def _add_months(date: datetime.date, months: int) -> datetime.date:
month = date.month - 1 + months
year = date.year + month // 12
month = month % 12 + 1
day = min(date.day, calendar.monthrange(year, month)[1])
return datetime.date(year, month, day)


def obsolete_more_than_6_months(distribution: str) -> bool:
try:
with metadata_path(distribution).open("rb") as file:
data = tomlkit.load(file)
except FileNotFoundError:
raise NoSuchStubError(f"Typeshed has no stubs for {distribution!r}!") from None
Comment on lines +510 to +514
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for later: We should probably extract that into a separate function in ts_utils.metadata. This is used several times in a few files. But in a later PR, not here.


obsolete_since = data["obsolete_since"]
if not obsolete_since:
return False

assert type(obsolete_since) is String
comment: str | None = obsolete_since.trivia.comment
if not comment:
return False

release_date_string = comment.removeprefix("# Released on ")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is probably also something that we should put into metadata (possibly as a structured obsolete field with since_version and since_date fields), but again in a later PR.

release_date = datetime.date.fromisoformat(release_date_string)
remove_date = _add_months(release_date, POLICY_MONTHS_DELTA)
today = datetime.datetime.now(tz=datetime.timezone.utc).date()

return remove_date >= today


def parse_no_longer_updated_from_archive(source: zipfile.ZipFile | tarfile.TarFile) -> bool:
if isinstance(source, zipfile.ZipFile):
try:
file = source.open("METADATA.toml", "r")
except KeyError:
return False
else:
try:
tarinfo = source.getmember("METADATA.toml")
file = source.extractfile(tarinfo) # type: ignore[assignment]
if file is None:
return False
except KeyError:
return False

with file as f:
toml_data: dict[str, object] = tomli.load(f)

no_longer_updated = toml_data.get("no_longer_updated", False)
assert type(no_longer_updated) is bool
return bool(no_longer_updated)


async def has_no_longer_updated_release(release_to_download: PypiReleaseDownload, *, session: aiohttp.ClientSession) -> bool:
"""
Return `True` if the `no_longer_updated` field exists and the value is
`True` in the `METADATA.toml` file of latest `types-{distribution}` pypi release.
"""
return await with_extracted_archive(release_to_download, session=session, handler=parse_no_longer_updated_from_archive)


async def determine_action(distribution: str, session: aiohttp.ClientSession) -> Update | NoUpdate | Obsolete | Remove:
stub_info = read_metadata(distribution)
if stub_info.is_obsolete:
return NoUpdate(stub_info.distribution, "obsolete")
if obsolete_more_than_6_months(stub_info.distribution):
pypi_info = await fetch_pypi_info(f"types-{stub_info.distribution}", session)
latest_release = pypi_info.get_latest_release()
links = {
"Typeshed release": f"{pypi_info.pypi_root}",
"Typeshed stubs": f"https://github.com/{TYPESHED_OWNER}/typeshed/tree/main/stubs/{stub_info.distribution}",
}
return Remove(stub_info.distribution, reason="older than 6 months", links=links)
else:
return NoUpdate(stub_info.distribution, "obsolete")
if stub_info.no_longer_updated:
return NoUpdate(stub_info.distribution, "no longer updated")
pypi_info = await fetch_pypi_info(f"types-{stub_info.distribution}", session)
latest_release = pypi_info.get_latest_release()

if await has_no_longer_updated_release(latest_release, session=session):
links = {
"Typeshed release": f"{pypi_info.pypi_root}",
"Typeshed stubs": f"https://github.com/{TYPESHED_OWNER}/typeshed/tree/main/stubs/{stub_info.distribution}",
}
return Remove(stub_info.distribution, reason="no longer updated", links=links)
else:
return NoUpdate(stub_info.distribution, "no longer updated")

pypi_info = await fetch_pypi_info(stub_info.distribution, session)
latest_release = pypi_info.get_latest_release()
Expand Down Expand Up @@ -683,6 +793,22 @@ def get_update_pr_body(update: Update, metadata: Mapping[str, Any]) -> str:
return body


def remove_stubs(distribution: str) -> None:
stub_path = distribution_path(distribution)
target_path_prefix = f'"stubs/{distribution}'

if stub_path.exists() and stub_path.is_dir():
shutil.rmtree(stub_path)

with PYRIGHT_CONFIG.open("r", encoding="UTF-8") as f:
lines = f.readlines()

lines = [line for line in lines if not line.lstrip().startswith(target_path_prefix)]

with PYRIGHT_CONFIG.open("w", encoding="UTF-8") as f:
f.writelines(lines)


async def suggest_typeshed_update(update: Update, session: aiohttp.ClientSession, action_level: ActionLevel) -> None:
if action_level <= ActionLevel.nothing:
return
Expand Down Expand Up @@ -729,6 +855,28 @@ async def suggest_typeshed_obsolete(obsolete: Obsolete, session: aiohttp.ClientS
await create_or_update_pull_request(title=title, body=body, branch_name=branch_name, session=session)


async def suggest_typeshed_remove(remove: Remove, session: aiohttp.ClientSession, action_level: ActionLevel) -> None:
if action_level <= ActionLevel.nothing:
return
title = f"[stubsabot] Remove {remove.distribution} as {remove.reason}"
async with _repo_lock:
branch_name = f"{BRANCH_PREFIX}/{normalize(remove.distribution)}"
subprocess.check_call(["git", "checkout", "-B", branch_name, "origin/main"])
remove_stubs(remove.distribution)
body = "\n".join(f"{k}: {v}" for k, v in remove.links.items())
subprocess.check_call(["git", "commit", "--all", "-m", f"{title}\n\n{body}"])
if action_level <= ActionLevel.local:
return
if not latest_commit_is_different_to_last_commit_on_origin(branch_name):
print(f"No pushing to origin required: origin/{branch_name} exists and requires no changes!")
return
somewhat_safe_force_push(branch_name)
if action_level <= ActionLevel.fork:
return

await create_or_update_pull_request(title=title, body=body, branch_name=branch_name, session=session)


async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
Expand Down Expand Up @@ -803,10 +951,13 @@ async def main() -> None:
if isinstance(update, Update):
await suggest_typeshed_update(update, session, action_level=args.action_level)
continue
# Redundant, but keeping for extra runtime validation
if isinstance(update, Obsolete): # pyright: ignore[reportUnnecessaryIsInstance]
if isinstance(update, Obsolete):
await suggest_typeshed_obsolete(update, session, action_level=args.action_level)
continue
# Redundant, but keeping for extra runtime validation
if isinstance(update, Remove): # pyright: ignore[reportUnnecessaryIsInstance]
await suggest_typeshed_remove(update, session, action_level=args.action_level)
continue
except RemoteConflictError as e:
print(colored(f"... but ran into {type(e).__qualname__}: {e}", "red"))
continue
Expand Down