From 1f72dcf2f62e53e95db8dfe4545e3ad83d49d6cd Mon Sep 17 00:00:00 2001 From: "Jeffrey A. Clark" Date: Mon, 28 Jul 2025 16:29:24 -0400 Subject: [PATCH] Refactor with typer - Add delete command method to Repo - Add branch command method to Repo - Add origin command method to Repo - Show help when no args - Add Package class to utils - Factor out logic if all_repos, elif, else - Alpha sort class methods - Add -l, --list-repos, to Repo main --- django_mongodb_cli/__init__.py | 8 + django_mongodb_cli/repo.py | 241 +++++++++++------- django_mongodb_cli/utils.py | 429 +++++++++++++++++++++++++-------- 3 files changed, 498 insertions(+), 180 deletions(-) diff --git a/django_mongodb_cli/__init__.py b/django_mongodb_cli/__init__.py index c60b68d..a4ab4e7 100644 --- a/django_mongodb_cli/__init__.py +++ b/django_mongodb_cli/__init__.py @@ -18,4 +18,12 @@ context_settings={"help_option_names": ["-h", "--help"]}, ) + +@dm.callback(invoke_without_command=True) +def main(ctx: typer.Context): + if ctx.invoked_subcommand is None: + typer.echo(ctx.get_help()) + raise typer.Exit() + + dm.add_typer(repo, name="repo") diff --git a/django_mongodb_cli/repo.py b/django_mongodb_cli/repo.py index 66830b8..e1c1b8a 100644 --- a/django_mongodb_cli/repo.py +++ b/django_mongodb_cli/repo.py @@ -1,10 +1,47 @@ import typer -from .utils import Repo, Test +from .utils import Package, Repo, Test repo = typer.Typer() +def repo_command( + all_repos: bool, + repo_name: str, + all_msg: str, + missing_msg: str, + single_func, + all_func, + fg=typer.colors.CYAN, + repo_list=None, +): + if all_repos: + if all_msg: + typer.echo(typer.style(all_msg, fg=fg)) + for name in repo_list if repo_list is not None else Repo().map: + all_func(name) + elif repo_name: + single_func(repo_name) + else: + typer.echo(typer.style(missing_msg, fg=typer.colors.YELLOW)) + + +@repo.callback(invoke_without_command=True) +def main( + ctx: typer.Context, + list_repos: bool = typer.Option( + False, "--list-repos", "-l", help="List available repositories." + ), +): + if list_repos: + Repo().list_repos() + raise typer.Exit() # End, no further action + + if ctx.invoked_subcommand is None: + typer.echo(ctx.get_help()) + raise typer.Exit() + + @repo.command() def status( repo_name: str = typer.Argument(None), @@ -12,22 +49,36 @@ def status( False, "--all-repos", "-a", help="Show status of all repos" ), ): - """Show the status of the specified Git repository.""" - if all_repos: - typer.echo( - typer.style("Showing status for all repositories...", fg=typer.colors.CYAN) - ) - for repo_name in Repo().map: - Repo().get_repo_status(repo_name) - elif repo_name: - Repo().get_repo_status(repo_name) - else: - typer.echo( - typer.style( - "Please specify a repository name or use --all-repos to show all repositories.", - fg=typer.colors.YELLOW, - ) - ) + repo_command( + all_repos, + repo_name, + all_msg="Showing status for all repositories...", + missing_msg="Please specify a repository name or use --all-repos to show all repositories.", + single_func=lambda name: Repo().get_repo_status(name), + all_func=lambda name: Repo().get_repo_status(name), + ) + + +@repo.command() +def branch( + repo_name: str = typer.Argument(None), + branch_name: str = typer.Argument(None), + all_repos: bool = typer.Option( + False, "--all-repos", "-a", help="Show branches of all repositories" + ), +): + repo = Repo() + if branch_name: + repo.set_branch(branch_name) + + repo_command( + all_repos, + repo_name, + all_msg="Showing branches for all repositories...", + missing_msg="Please specify a repository name or use --all-repos to show branches of all repositories.", + single_func=lambda name: repo.get_repo_branches(name), + all_func=lambda name: repo.get_repo_branches(name), + ) @repo.command() @@ -40,24 +91,45 @@ def clone( False, "--install", "-i", help="Install after cloning" ), ): - """Clone the specified Git repository.""" - if all_repos: - typer.echo(typer.style("Cloning all repositories...", fg=typer.colors.CYAN)) - for repo_name in Repo().map: - Repo().clone_repo(repo_name) - if install: - Repo().install_package(repo_name) - elif repo_name: - Repo().clone_repo(repo_name) + def clone_repo(name): + Repo().clone_repo(name) if install: - Repo().install_package(repo_name) - else: - typer.echo( - typer.style( - "Please specify a repository name or use --all-repos to clone all repositories.", - fg=typer.colors.YELLOW, - ) - ) + Package().install_package(name) + + repo_command( + all_repos, + repo_name, + all_msg="Cloning all repositories...", + missing_msg="Please specify a repository name or use --all-repos to clone all repositories.", + single_func=clone_repo, + all_func=clone_repo, + ) + + +@repo.command() +def delete( + repo_name: str = typer.Argument(None), + all_repos: bool = typer.Option( + False, "--all-repos", "-a", help="Delete all repositories" + ), + uninstall: bool = typer.Option( + False, "--uninstall", "-u", help="Uninstall after deleting" + ), +): + def do_delete(name): + if uninstall: + Package().uninstall_package(name) + Repo().delete_repo(name) + + repo_command( + all_repos, + repo_name, + all_msg="Deleting all repositories...", + missing_msg="Please specify a repository name or use --all-repos to delete all repositories.", + single_func=do_delete, + all_func=do_delete, + fg=typer.colors.RED, # Red for delete + ) @repo.command() @@ -67,20 +139,31 @@ def install( False, "--all-repos", "-a", help="Install all repositories" ), ): - """Install the specified Git repository.""" - if all_repos: - typer.echo(typer.style("Installing all repositories...", fg=typer.colors.CYAN)) - for repo_name in Repo().map: - Repo().install_package(repo_name) - elif repo_name: - Repo().install_package(repo_name) - else: - typer.echo( - typer.style( - "Please specify a repository name or use --all-repos to install all repositories.", - fg=typer.colors.YELLOW, - ) - ) + repo_command( + all_repos, + repo_name, + all_msg="Installing all repositories...", + missing_msg="Please specify a repository name or use --all-repos to install all repositories.", + single_func=lambda name: Package().install_package(name), + all_func=lambda name: Package().install_package(name), + ) + + +@repo.command() +def origin( + repo_name: str = typer.Argument(None), + all_repos: bool = typer.Option( + False, "--all-repos", "-a", help="Show origin of all repositories" + ), +): + repo_command( + all_repos, + repo_name, + all_msg="Showing origin for all repositories...", + missing_msg="Please specify a repository name or use --all-repos to show origins of all repositories.", + single_func=lambda name: Repo().get_repo_origin(name), + all_func=lambda name: Repo().get_repo_origin(name), + ) @repo.command() @@ -90,20 +173,14 @@ def sync( False, "--all-repos", "-a", help="Sync all repositories" ), ): - """Sync the specified Git repository.""" - if all_repos: - typer.echo(typer.style("Syncing all repositories...", fg=typer.colors.CYAN)) - for repo_name in Repo().map: - Repo().sync_repo(repo_name) - elif repo_name: - Repo().sync_repo(repo_name) - else: - typer.echo( - typer.style( - "Please specify a repository name or use --all-repos to sync all repositories.", - fg=typer.colors.YELLOW, - ) - ) + repo_command( + all_repos, + repo_name, + all_msg="Syncing all repositories...", + missing_msg="Please specify a repository name or use --all-repos to sync all repositories.", + single_func=lambda name: Repo().sync_repo(name), + all_func=lambda name: Repo().sync_repo(name), + ) @repo.command() @@ -111,7 +188,7 @@ def test( repo_name: str = typer.Argument(None), modules: list[str] = typer.Argument(None), all_repos: bool = typer.Option( - False, "--all-repos", "-a", help="Sync all repositories" + False, "--all-repos", "-a", help="Run tests for all repositories" ), keep_db: bool = typer.Option( False, "--keepdb", help="Keep the database after tests" @@ -123,31 +200,25 @@ def test( False, "--setenv", "-s", - help="Set DJANGO_SETTINGS_MODULE" " environment variable", + help="Set DJANGO_SETTINGS_MODULE environment variable", ), ): - """Run tests for the specified Git repository.""" - repo = Test() + test_runner = Test() if modules: - repo.set_modules(modules) + test_runner.set_modules(modules) if keep_db: - repo.set_keep_db(keep_db) + test_runner.set_keep_db(keep_db) if keyword: - repo.set_keyword(keyword) + test_runner.set_keyword(keyword) if setenv: - repo.set_env(setenv) - if all_repos: - typer.echo( - typer.style("Running tests for all repositories...", fg=typer.colors.CYAN) - ) - for repo_name in repo.map: - repo.run_tests(repo_name) - elif repo_name: - repo.run_tests(repo_name) - else: - typer.echo( - typer.style( - "Please specify a repository name or use --all-repos to run tests for all repositories.", - fg=typer.colors.YELLOW, - ) - ) + test_runner.set_env(setenv) + + repo_command( + all_repos, + repo_name, + all_msg="Running tests for all repositories...", + missing_msg="Please specify a repository name or use --all-repos to run tests for all repositories.", + single_func=lambda name: test_runner.run_tests(name), + all_func=lambda name: test_runner.run_tests(name), + repo_list=test_runner.map, # Use Test().map instead of Repo().map + ) diff --git a/django_mongodb_cli/utils.py b/django_mongodb_cli/utils.py index 57f7d86..52824fb 100644 --- a/django_mongodb_cli/utils.py +++ b/django_mongodb_cli/utils.py @@ -26,10 +26,111 @@ def __init__(self, pyproject_file: Path = Path("pyproject.toml")): self.config = self._load_config() self.path = Path(self.config["tool"]["django_mongodb_cli"]["path"]) self.map = self.get_map() + self.branch = None def _load_config(self) -> dict: return toml.load(self.pyproject_file) + def clone_repo(self, repo_name: str) -> None: + """ + Clone a repository into the specified path. + If the repository already exists, it will skip cloning. + """ + typer.echo( + typer.style(f"Cloning repository: {repo_name}", fg=typer.colors.CYAN) + ) + + if repo_name not in self.map: + typer.echo( + typer.style( + f"Repository '{repo_name}' not found in configuration.", + fg=typer.colors.RED, + ) + ) + return + + url = self.map[repo_name] + path = self.get_repo_path(repo_name) + branch = ( + BRANCH_PATTERN.search(url).group(1) + if BRANCH_PATTERN.search(url) + else "main" + ) + url = URL_PATTERN.search(url).group(0) + if os.path.exists(path): + typer.echo( + typer.style( + f"Repository '{repo_name}' already exists at path: {path}", + fg=typer.colors.YELLOW, + ) + ) + return + + typer.echo( + typer.style( + f"Cloning {url} into {path} (branch: {branch})", fg=typer.colors.CYAN + ) + ) + GitRepo.clone_from(url, path, branch=branch) + + # Install pre-commit hooks if config exists + pre_commit_config = os.path.join(path, ".pre-commit-config.yaml") + if os.path.exists(pre_commit_config): + typer.echo( + typer.style("Installing pre-commit hooks...", fg=typer.colors.CYAN) + ) + try: + subprocess.run(["pre-commit", "install"], cwd=path, check=True) + typer.echo( + typer.style("Pre-commit hooks installed!", fg=typer.colors.GREEN) + ) + except subprocess.CalledProcessError as e: + typer.echo( + typer.style( + f"Failed to install pre-commit hooks for {repo_name}: {e}", + fg=typer.colors.RED, + ) + ) + else: + typer.echo( + typer.style( + "No .pre-commit-config.yaml found. Skipping pre-commit hook installation.", + fg=typer.colors.YELLOW, + ) + ) + + def delete_repo(self, repo_name: str) -> None: + """ + Delete the specified repository. + """ + typer.echo( + typer.style(f"Deleting repository: {repo_name}", fg=typer.colors.CYAN) + ) + + path = self.get_repo_path(repo_name) + if not os.path.exists(path): + typer.echo( + typer.style( + f"Repository '{repo_name}' not found at path: {path}", + fg=typer.colors.RED, + ) + ) + return + + try: + shutil.rmtree(path) + typer.echo( + typer.style( + f"✅ Successfully deleted {repo_name}.", fg=typer.colors.GREEN + ) + ) + except Exception as e: + typer.echo( + typer.style( + f"❌ Failed to delete {repo_name}: {e}", fg=typer.colors.RED + ) + ) + def get_map(self) -> dict: """ Return a dict mapping repo_name to repo_url from repos in @@ -41,6 +142,98 @@ def get_map(self) -> dict: if "@" in repo } + def get_repo_branches(self, repo_name: str) -> list: + """ + Get a list of both local and remote branches for the specified repository. + Optionally, if self.branch is set, switch to it (existing) or create new (checkout -b). + """ + typer.echo( + typer.style( + f"Getting branches for repository: {repo_name}", fg=typer.colors.CYAN + ) + ) + + path = self.get_repo_path(repo_name) + if not os.path.exists(path): + typer.echo( + typer.style( + f"Repository '{repo_name}' not found at path: {path}", + fg=typer.colors.RED, + ) + ) + return [] + + repo = self.get_repo(path) + + # Get local branches + local_branches = [branch.name for branch in repo.branches] + + # Get remote branches; skip HEAD pointer + remote_branches = [ + ref.name.replace("origin/", "") + for ref in repo.remotes.origin.refs + if ref.name != "origin/HEAD" + ] + + # Merge, deduplicate, and sort + all_branches = sorted(set(local_branches + remote_branches)) + + typer.echo( + typer.style( + f"Branches in {repo_name}: {', '.join(all_branches)}", + fg=typer.colors.GREEN, + ) + ) + + if getattr(self, "branch", None): + if self.branch in local_branches: + typer.echo( + typer.style( + f"Checking out existing branch '{self.branch}'", + fg=typer.colors.YELLOW, + ) + ) + repo.git.checkout(self.branch) + else: + typer.echo( + typer.style( + f"Branch '{self.branch}' does not exist. Creating and checking out new branch.", + fg=typer.colors.YELLOW, + ) + ) + repo.git.checkout("-b", self.branch) + + return all_branches + + def get_repo_origin(self, repo_name: str) -> str: + """ + Get the origin URL of the specified repository. + """ + typer.echo( + typer.style( + f"Getting origin for repository: {repo_name}", fg=typer.colors.CYAN + ) + ) + + path = self.get_repo_path(repo_name) + if not os.path.exists(path): + typer.echo( + typer.style( + f"Repository '{repo_name}' not found at path: {path}", + fg=typer.colors.RED, + ) + ) + return "" + + repo = self.get_repo(path) + origin_url = repo.remotes.origin.url + typer.echo( + typer.style( + f"Origin URL for {repo_name}: {origin_url}", fg=typer.colors.GREEN + ) + ) + return origin_url + def get_repo_path(self, name: str) -> Path: return (self.path / name).resolve() @@ -102,81 +295,72 @@ def get_repo_status(self, repo_name: str) -> str: ) ) - def clone_repo(self, repo_name: str) -> None: + def list_repos(self) -> None: """ - Clone a repository into the specified path. - If the repository already exists, it will skip cloning. + List all repositories found either in self.map or as directories in self.path. """ - typer.echo( - typer.style(f"Cloning repository: {repo_name}", fg=typer.colors.CYAN) - ) + typer.echo(typer.style("Listing repositories...", fg=typer.colors.CYAN)) - if repo_name not in self.map: + # Set from self.map + map_repos = set(self.map.keys()) + + # Set from filesystem + try: + fs_entries = os.listdir(self.path) + fs_repos = { + entry + for entry in fs_entries + if os.path.isdir(os.path.join(self.path, entry)) + } + except Exception as e: typer.echo( typer.style( - f"Repository '{repo_name}' not found in configuration.", + f"❌ Failed to list repositories in filesystem: {e}", fg=typer.colors.RED, ) ) return - url = self.map[repo_name] - path = self.get_repo_path(repo_name) - branch = ( - BRANCH_PATTERN.search(url).group(1) - if BRANCH_PATTERN.search(url) - else "main" - ) - url = URL_PATTERN.search(url).group(0) - if os.path.exists(path): + # Compute differences + only_in_map = map_repos - fs_repos + only_in_fs = fs_repos - map_repos + in_both = map_repos & fs_repos + + # Output + if in_both: typer.echo( typer.style( - f"Repository '{repo_name}' already exists at path: {path}", - fg=typer.colors.YELLOW, + "Repositories in both self.map and filesystem:", + fg=typer.colors.GREEN, ) ) - return - - typer.echo( - typer.style( - f"Cloning {url} into {path} (branch: {branch})", fg=typer.colors.CYAN - ) - ) - GitRepo.clone_from(url, path, branch=branch) + for name in sorted(in_both): + typer.echo(f" - {name}") - # Install pre-commit hooks if config exists - pre_commit_config = os.path.join(path, ".pre-commit-config.yaml") - if os.path.exists(pre_commit_config): + if only_in_map: typer.echo( - typer.style("Installing pre-commit hooks...", fg=typer.colors.CYAN) + typer.style("Repositories only in self.map:", fg=typer.colors.YELLOW) ) - try: - subprocess.run(["pre-commit", "install"], cwd=path, check=True) - typer.echo( - typer.style("Pre-commit hooks installed!", fg=typer.colors.GREEN) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"Failed to install pre-commit hooks for {repo_name}: {e}", - fg=typer.colors.RED, - ) - ) - else: + for name in sorted(only_in_map): + typer.echo(f" - {name}") + + if only_in_fs: typer.echo( - typer.style( - "No .pre-commit-config.yaml found. Skipping pre-commit hook installation.", - fg=typer.colors.YELLOW, - ) + typer.style("Repositories only in filesystem:", fg=typer.colors.MAGENTA) ) + for name in sorted(only_in_fs): + typer.echo(f" - {name}") - def install_package(self, repo_name: str) -> None: + if not (in_both or only_in_map or only_in_fs): + typer.echo("No repositories found.") + + def run_tests(self, repo_name: str) -> None: """ - Install a package from the cloned repository. + Run tests for the specified repository. """ typer.echo( typer.style( - f"Installing package from repository: {repo_name}", fg=typer.colors.CYAN + f"Running tests for repository: {repo_name}", fg=typer.colors.CYAN ) ) @@ -190,24 +374,16 @@ def install_package(self, repo_name: str) -> None: ) return - try: - subprocess.run( - [os.sys.executable, "-m", "pip", "install", "-e", path], - check=True, - ) - typer.echo( - typer.style( - f"✅ Successfully installed package from {repo_name}.", - fg=typer.colors.GREEN, - ) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"❌ Failed to install package from {repo_name}: {e}", - fg=typer.colors.RED, - ) + Test().run_tests(repo_name) + typer.echo( + typer.style( + f"✅ Tests completed successfully for {repo_name}.", + fg=typer.colors.GREEN, ) + ) + + def set_branch(self, branch: str) -> None: + self.branch = branch def sync_repo(self, repo_name: str) -> None: """ @@ -243,13 +419,15 @@ def sync_repo(self, repo_name: str) -> None: ) ) - def run_tests(self, repo_name: str) -> None: + +class Package(Repo): + def install_package(self, repo_name: str) -> None: """ - Run tests for the specified repository. + Install a package from the cloned repository. """ typer.echo( typer.style( - f"Running tests for repository: {repo_name}", fg=typer.colors.CYAN + f"Installing package from repository: {repo_name}", fg=typer.colors.CYAN ) ) @@ -263,14 +441,65 @@ def run_tests(self, repo_name: str) -> None: ) return - Test().run_tests(repo_name) + try: + subprocess.run( + [os.sys.executable, "-m", "pip", "install", "-e", path], + check=True, + ) + typer.echo( + typer.style( + f"✅ Successfully installed package from {repo_name}.", + fg=typer.colors.GREEN, + ) + ) + except subprocess.CalledProcessError as e: + typer.echo( + typer.style( + f"❌ Failed to install package from {repo_name}: {e}", + fg=typer.colors.RED, + ) + ) + + def uninstall_package(self, repo_name: str) -> None: + """ + Uninstall a package from the cloned repository. + """ typer.echo( typer.style( - f"✅ Tests completed successfully for {repo_name}.", - fg=typer.colors.GREEN, + f"Uninstalling package from repository: {repo_name}", + fg=typer.colors.CYAN, ) ) + path = self.get_repo_path(repo_name) + if not os.path.exists(path): + typer.echo( + typer.style( + f"Repository '{repo_name}' not found at path: {path}", + fg=typer.colors.RED, + ) + ) + return + + try: + subprocess.run( + [os.sys.executable, "-m", "pip", "uninstall", "-y", repo_name], + check=True, + ) + typer.echo( + typer.style( + f"✅ Successfully uninstalled package from {repo_name}.", + fg=typer.colors.GREEN, + ) + ) + except subprocess.CalledProcessError as e: + typer.echo( + typer.style( + f"❌ Failed to uninstall package from {repo_name}: {e}", + fg=typer.colors.RED, + ) + ) + class Test(Repo): """ @@ -287,21 +516,6 @@ def __init__(self, pyproject_file: Path = Path("pyproject.toml")): self.keyword = None self.setenv = False - def set_modules(self, modules: list) -> None: - self.modules = modules - - def set_keep_db(self, keep_db: bool) -> None: - """Set whether to keep the database after tests.""" - self.keep_db = keep_db - - def set_keyword(self, keyword: str) -> None: - """Set a keyword to filter tests.""" - self.keyword = keyword - - def set_env(self, setenv: bool) -> None: - """Set whether to set DJANGO_SETTINGS_MODULE environment variable.""" - self.setenv = setenv - def copy_settings(self, repo_name: str) -> None: """ Copy test settings from this repository to the repository @@ -322,6 +536,14 @@ def copy_apps(self, repo_name: str) -> None: Copy test settings from this repository to the repository specified by repo_name. """ + if "apps_file" not in self.test_settings: + typer.echo( + typer.style( + f"No apps_file settings found for {repo_name}.", + fg=typer.colors.YELLOW, + ) + ) + return source = self.test_settings["apps_file"]["source"] target = self.test_settings["apps_file"]["target"] shutil.copyfile(source, target) @@ -339,13 +561,15 @@ def copy_migrations(self, repo_name: str) -> None: """ source = self.test_settings["migrations_dir"]["source"] target = self.test_settings["migrations_dir"]["target"] - shutil.copytree(source, target) - typer.echo( - typer.style( - f"Copied apps from {source} to {target} for {repo_name}.", - fg=typer.colors.CYAN, + + if not os.path.exists(target): + shutil.copytree(source, target) + typer.echo( + typer.style( + f"Copied migrations from {source} to {target} for {repo_name}.", + fg=typer.colors.CYAN, + ) ) - ) def run_tests(self, repo_name: str) -> None: self.test_settings = ( @@ -392,3 +616,18 @@ def run_tests(self, repo_name: str) -> None: test_command, cwd=test_dir, ) + + def set_modules(self, modules: list) -> None: + self.modules = modules + + def set_keep_db(self, keep_db: bool) -> None: + """Set whether to keep the database after tests.""" + self.keep_db = keep_db + + def set_keyword(self, keyword: str) -> None: + """Set a keyword to filter tests.""" + self.keyword = keyword + + def set_env(self, setenv: bool) -> None: + """Set whether to set DJANGO_SETTINGS_MODULE environment variable.""" + self.setenv = setenv