Skip to content

feat: Rework PAT to use headers only #507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/gitingest/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
create_git_auth_header,
create_git_command,
ensure_git_installed,
is_github_host,
resolve_commit,
run_command,
)
Expand Down Expand Up @@ -40,7 +39,7 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
config : CloneConfig
The configuration for cloning the repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Raises
------
Expand Down Expand Up @@ -84,7 +83,7 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
logger.debug("Resolved commit", extra={"commit": commit})

clone_cmd = ["git"]
if token and is_github_host(url):
if token:
clone_cmd += ["-c", create_git_auth_header(token, url=url)]

clone_cmd += ["clone", "--single-branch", "--no-checkout", "--depth=1"]
Expand Down
8 changes: 2 additions & 6 deletions src/gitingest/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,20 @@

import os

from gitingest.utils.git_utils import validate_github_token


def resolve_token(token: str | None) -> str | None:
"""Resolve the token to use for the query.

Parameters
----------
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
str | None
The resolved token.

"""
token = token or os.getenv("GITHUB_TOKEN")
if token:
validate_github_token(token)
token = token or os.getenv("GITHUB_TOKEN") # Keep env var name for backward compatibility
return token
10 changes: 1 addition & 9 deletions src/gitingest/utils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,4 @@ def __init__(self, message: str) -> None:
super().__init__(message)


class InvalidGitHubTokenError(ValueError):
"""Exception raised when a GitHub Personal Access Token is malformed."""

def __init__(self) -> None:
msg = (
"Invalid GitHub token format. To generate a token, go to "
"https://github.com/settings/tokens/new?description=gitingest&scopes=repo."
)
super().__init__(msg)

149 changes: 27 additions & 122 deletions src/gitingest/utils/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

import asyncio
import base64
import re
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Final, Iterable
from typing import TYPE_CHECKING, Iterable
from urllib.parse import urlparse

import httpx
from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND

from gitingest.utils.compat_func import removesuffix
from gitingest.utils.exceptions import InvalidGitHubTokenError

from gitingest.utils.logging_config import get_logger

if TYPE_CHECKING:
Expand All @@ -23,28 +19,8 @@
# Initialize logger for this module
logger = get_logger(__name__)

# GitHub Personal-Access tokens (classic + fine-grained).
# - ghp_ / gho_ / ghu_ / ghs_ / ghr_ → 36 alphanumerics
# - github_pat_ → 22 alphanumerics + "_" + 59 alphanumerics
_GITHUB_PAT_PATTERN: Final[str] = r"^(?:gh[pousr]_[A-Za-z0-9]{36}|github_pat_[A-Za-z0-9]{22}_[A-Za-z0-9]{59})$"


def is_github_host(url: str) -> bool:
"""Check if a URL is from a GitHub host (github.com or GitHub Enterprise).

Parameters
----------
url : str
The URL to check

Returns
-------
bool
True if the URL is from a GitHub host, False otherwise

"""
hostname = urlparse(url).hostname or ""
return hostname.startswith("github.")


async def run_command(*args: str) -> tuple[bytes, bytes]:
Expand Down Expand Up @@ -119,80 +95,27 @@ async def check_repo_exists(url: str, token: str | None = None) -> bool:
url : str
URL of the Git repository to check.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
bool
``True`` if the repository exists, ``False`` otherwise.

Raises
------
RuntimeError
If the host returns an unrecognised status code.

"""
headers = {}

if token and is_github_host(url):
host, owner, repo = _parse_github_url(url)
# Public GitHub vs. GitHub Enterprise
base_api = "https://api.github.com" if host == "github.com" else f"https://{host}/api/v3"
url = f"{base_api}/repos/{owner}/{repo}"
headers["Authorization"] = f"Bearer {token}"

async with httpx.AsyncClient(follow_redirects=True) as client:
try:
response = await client.head(url, headers=headers)
except httpx.RequestError:
return False

status_code = response.status_code

if status_code == HTTP_200_OK:
try:
# Use git ls-remote to check if repository exists
cmd = ["git"]
if token:
cmd += ["-c", create_git_auth_header(token, url=url)]
cmd += ["ls-remote", "--heads", url]

await run_command(*cmd)
return True
if status_code in {HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND}:
except Exception:
return False
msg = f"Unexpected HTTP status {status_code} for {url}"
raise RuntimeError(msg)


def _parse_github_url(url: str) -> tuple[str, str, str]:
"""Parse a GitHub URL and return (hostname, owner, repo).

Parameters
----------
url : str
The URL of the GitHub repository to parse.

Returns
-------
tuple[str, str, str]
A tuple containing the hostname, owner, and repository name.

Raises
------
ValueError
If the URL is not a valid GitHub repository URL.

"""
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
msg = f"URL must start with http:// or https://: {url!r}"
raise ValueError(msg)

if not parsed.hostname or not parsed.hostname.startswith("github."):
msg = f"Un-recognised GitHub hostname: {parsed.hostname!r}"
raise ValueError(msg)

parts = removesuffix(parsed.path, ".git").strip("/").split("/")
expected_path_length = 2
if len(parts) != expected_path_length:
msg = f"Path must look like /<owner>/<repo>: {parsed.path!r}"
raise ValueError(msg)

owner, repo = parts
return parsed.hostname, owner, repo


async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str | None = None) -> list[str]:
Expand All @@ -205,7 +128,7 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str |
ref_type: str
The type of reference to fetch. Can be "branches" or "tags".
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
Expand All @@ -225,7 +148,7 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str |
cmd = ["git"]

# Add authentication if needed
if token and is_github_host(url):
if token:
cmd += ["-c", create_git_auth_header(token, url=url)]

cmd += ["ls-remote"]
Expand Down Expand Up @@ -263,9 +186,9 @@ def create_git_command(base_cmd: list[str], local_path: str, url: str, token: st
local_path : str
The local path where the git command should be executed.
url : str
The repository URL to check if it's a GitHub repository.
The repository URL for authentication.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
Expand All @@ -274,21 +197,20 @@ def create_git_command(base_cmd: list[str], local_path: str, url: str, token: st

"""
cmd = [*base_cmd, "-C", local_path]
if token and is_github_host(url):
if token:
cmd += ["-c", create_git_auth_header(token, url=url)]
return cmd


def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
"""Create a Basic authentication header for GitHub git operations.
def create_git_auth_header(token: str, url: str) -> str:
"""Create a Basic authentication header for git operations.

Parameters
----------
token : str
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.
url : str
The GitHub URL to create the authentication header for.
Defaults to "https://github.com" if not provided.
The repository URL to create the authentication header for.

Returns
-------
Expand All @@ -298,35 +220,18 @@ def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
Raises
------
ValueError
If the URL is not a valid GitHub repository URL.
If the URL is not a valid repository URL.

"""
hostname = urlparse(url).hostname
if not hostname:
msg = f"Invalid GitHub URL: {url!r}"
msg = f"Invalid repository URL: {url!r}"
raise ValueError(msg)

basic = base64.b64encode(f"x-oauth-basic:{token}".encode()).decode()
return f"http.https://{hostname}/.extraheader=Authorization: Basic {basic}"


def validate_github_token(token: str) -> None:
"""Validate the format of a GitHub Personal Access Token.

Parameters
----------
token : str
GitHub personal access token (PAT) for accessing private repositories.

Raises
------
InvalidGitHubTokenError
If the token format is invalid.

"""
if not re.fullmatch(_GITHUB_PAT_PATTERN, token):
raise InvalidGitHubTokenError


async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None:
"""Configure sparse-checkout for a partially cloned repository.
Expand All @@ -336,7 +241,7 @@ async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None
config : CloneConfig
The configuration for cloning the repository, including subpath and blob flag.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

"""
subpath = config.subpath.lstrip("/")
Expand All @@ -355,7 +260,7 @@ async def resolve_commit(config: CloneConfig, token: str | None) -> str:
config : CloneConfig
The configuration for cloning the repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
Expand Down Expand Up @@ -387,7 +292,7 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None)
pattern : str
The pattern to use to resolve the commit SHA.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
Personal access token (PAT) for accessing private repositories.

Returns
-------
Expand All @@ -402,7 +307,7 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None)
"""
# Build: git [-c http.<host>/.extraheader=Auth...] ls-remote <url> <pattern>
cmd: list[str] = ["git"]
if token and is_github_host(url):
if token:
cmd += ["-c", create_git_auth_header(token, url=url)]

cmd += ["ls-remote", url, pattern]
Expand Down
5 changes: 1 addition & 4 deletions src/server/query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from gitingest.clone import clone_repo
from gitingest.ingestion import ingest_query
from gitingest.query_parser import parse_remote_repo
from gitingest.utils.git_utils import resolve_commit, validate_github_token
from gitingest.utils.git_utils import resolve_commit
from gitingest.utils.logging_config import get_logger
from gitingest.utils.pattern_utils import process_patterns
from server.models import IngestErrorResponse, IngestResponse, IngestSuccessResponse, PatternType, S3Metadata
Expand Down Expand Up @@ -262,9 +262,6 @@ async def process_query(
If the commit hash is not found (should never happen).

"""
if token:
validate_github_token(token)

try:
query = await parse_remote_repo(input_text, token=token)
except Exception as exc:
Expand Down
Loading
Loading