From 9a5e63b330842cef471ab25be7b6653399495506 Mon Sep 17 00:00:00 2001 From: Paul Larson Date: Wed, 5 Feb 2025 16:46:56 -0600 Subject: [PATCH] Cleanups and argparse * Some pep8 cleanups * use argparse so that user gets a nice help message instead of silently failing --- grummage | 244 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 147 insertions(+), 97 deletions(-) diff --git a/grummage b/grummage index a657ca3..55d24d4 100755 --- a/grummage +++ b/grummage @@ -1,4 +1,5 @@ #!/usr/bin/env python +import argparse import json import os import subprocess @@ -11,15 +12,38 @@ from textual.containers import Container, Horizontal, VerticalScroll from textual.widgets import Tree, Footer, Static from textual.widgets import Markdown + +def parse_args(): + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description="An interactive terminal frontend to Grype." + ) + parser.add_argument( + "sbom_file", + type=argparse.FileType("r"), + help="The path to the SBOM file to analyze.", + ) + return parser.parse_args() + + +def load_sbom(file): + """Load the SBOM from a file or stdin.""" + try: + return json.load(file) + except json.JSONDecodeError as exc: + sys.exit(f"Error reading SBOM from stdin: {exc}") + + def format_urls_as_markdown(text): """Convert plain URLs in text to markdown links, skipping already formatted markdown links.""" # Skip URLs that are already in markdown format [text](url) - if re.search(r'\[.+?\]\(.+?\)', text): + if re.search(r"\[.+?\]\(.+?\)", text): return text - + # Convert plain URLs to markdown links - url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(?:/[^)\s]*)?' - return re.sub(url_pattern, lambda m: f'[{m.group()}]({m.group()})', text) + url_pattern = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(?:/[^)\s]*)?" + return re.sub(url_pattern, lambda m: f"[{m.group()}]({m.group()})", text) + def is_grype_installed(): """Check if the grype binary is available in the system's PATH.""" @@ -29,13 +53,19 @@ def is_grype_installed(): for path in os.environ["PATH"].split(os.pathsep) ) + def prompt_install_grype(): """Prompt the user to install grype if it's not installed.""" - response = input( - "The grype binary is not located in $PATH. Would you like to install it now? [y/N]: " - ).strip().lower() + response = ( + input( + "The grype binary is not located in $PATH. Would you like to install it now? [y/N]: " + ) + .strip() + .lower() + ) return response == "y" + def install_grype(): """Run the one-liner command to install grype.""" try: @@ -50,6 +80,7 @@ def install_grype(): print(f"Failed to install grype: {e}") sys.exit(1) + class Grummage(App): BINDINGS = [ ("v", "load_tree_by_vulnerability", "by Vuln"), @@ -106,74 +137,53 @@ class Grummage(App): await self.mount(Footer()) self.debug_log("on_mount: Layout mounted") - # Load the SBOM from file or stdin - await self.load_sbom() + self.call_grype() - async def load_sbom(self): - """Load the SBOM from a file or stdin.""" - if self.sbom_file: - # Load SBOM from the provided file path - self.debug_log(f"Loading SBOM from file: {self.sbom_file}") - sbom_json = self.load_json(self.sbom_file) - else: - # Read SBOM from stdin - self.debug_log("Loading SBOM from stdin") - try: - sbom_json = json.load(sys.stdin) - except json.JSONDecodeError as e: - self.debug_log(f"Error reading SBOM from stdin: {e}") - self.status_bar.update("Status: Failed to read SBOM from stdin.") - return - - # Run Grype analysis on the loaded SBOM JSON - self.vulnerability_report = self.call_grype(sbom_json) - if self.vulnerability_report and "matches" in self.vulnerability_report: - self.load_tree_by_package_name() - self.status_bar.update("Status: Vulnerability data loaded. Press N, T, V, S to change views, or E to explain.") - self.debug_log("Vulnerability data loaded into tree") - else: - self.status_bar.update("Status: No vulnerabilities found or unable to load data.") - self.debug_log("No vulnerability data found") - - def load_json(self, file_path): - """Load SBOM JSON from a file.""" - try: - with open(file_path, "r") as file: - return json.load(file) - except Exception as e: - self.debug_log(f"Error loading SBOM JSON: {e}") - return None - - def call_grype(self, sbom_json): + def call_grype(self): """Call Grype with the SBOM JSON to generate a vulnerability report.""" try: # Create a temporary file to store the SBOM JSON - with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.json') as temp_file: - json.dump(sbom_json, temp_file) + with tempfile.NamedTemporaryFile( + delete=False, mode="w", suffix=".json" + ) as temp_file: + json.dump(self.sbom_file, temp_file) temp_file_path = temp_file.name # Call Grype using the temporary JSON file path result = subprocess.run( ["grype", temp_file_path, "-o", "json"], capture_output=True, - text=True + text=True, ) # Print stdout and stderr for debugging - #print("Grype STDOUT:", result.stdout) - #print("Grype STDERR:", result.stderr) + # print("Grype STDOUT:", result.stdout) + # print("Grype STDERR:", result.stderr) if result.returncode != 0: print("Grype encountered an error:", result.stderr) - return None + return - # Return the parsed JSON if no errors occurred - return json.loads(result.stdout) + self.vulnerability_report = json.loads(result.stdout) + + except Exception as exc: + sys.exit(f"Error running Grype: {exc}") + + if ( + self.vulnerability_report + and "matches" in self.vulnerability_report + ): + self.load_tree_by_package_name() + self.status_bar.update( + "Status: Vulnerability data loaded. Press N, T, V, S to change views, or E to explain." + ) + self.debug_log("Vulnerability data loaded into tree") + else: + self.status_bar.update( + "Status: No vulnerabilities found or unable to load data." + ) + self.debug_log("No vulnerability data found") - except Exception as e: - print("Error running Grype:", e) - return None - def load_tree_by_package_name(self): """Display vulnerabilities organized by package name.""" self.tree_view.clear() @@ -187,15 +197,19 @@ class Grummage(App): for match in matches: vuln_id = match["vulnerability"]["id"] vuln_node = file_node.add_leaf(f"{vuln_id}") - + # Store detailed info for right-hand pane display vuln_node.data = { "id": vuln_id, "package_name": match["artifact"]["name"], "package_version": match["artifact"]["version"], - "severity": match["vulnerability"].get("severity", "Unknown"), - "fix_version": match["vulnerability"].get("fix", {}).get("versions", ["None"]), - "related": match.get("relatedVulnerabilities", []) + "severity": match["vulnerability"].get( + "severity", "Unknown" + ), + "fix_version": match["vulnerability"] + .get("fix", {}) + .get("versions", ["None"]), + "related": match.get("relatedVulnerabilities", []), } def load_tree_by_type(self): @@ -207,28 +221,37 @@ class Grummage(App): for match in self.vulnerability_report["matches"]: pkg_type = match["artifact"]["type"] pkg_name = match["artifact"]["name"] - type_map.setdefault(pkg_type, {}).setdefault(pkg_name, []).append(match) + type_map.setdefault(pkg_type, {}).setdefault(pkg_name, []).append( + match + ) # Build the tree view with the new structure for pkg_type, packages in type_map.items(): - type_node = self.tree_view.root.add(pkg_type) # Add package type node + type_node = self.tree_view.root.add( + pkg_type + ) # Add package type node for pkg_name, matches in packages.items(): package_node = type_node.add(pkg_name) # Add package name node for match in matches: vuln_id = match["vulnerability"]["id"] - vuln_node = package_node.add_leaf(f"{vuln_id}") # Add vulnerability ID under package name - + vuln_node = package_node.add_leaf( + f"{vuln_id}" + ) # Add vulnerability ID under package name + # Store detailed info for right-hand pane display vuln_node.data = { "id": vuln_id, "package_name": match["artifact"]["name"], "package_version": match["artifact"]["version"], - "severity": match["vulnerability"].get("severity", "Unknown"), - "fix_version": match["vulnerability"].get("fix", {}).get("versions", ["None"]), - "related": match.get("relatedVulnerabilities", []) + "severity": match["vulnerability"].get( + "severity", "Unknown" + ), + "fix_version": match["vulnerability"] + .get("fix", {}) + .get("versions", ["None"]), + "related": match.get("relatedVulnerabilities", []), } - def load_tree_by_vulnerability(self): """Display vulnerabilities organized by vulnerability ID.""" self.tree_view.clear() @@ -242,23 +265,34 @@ class Grummage(App): for match in matches: pkg_name = match["artifact"]["name"] package_node = vuln_node.add_leaf(f"{pkg_name}") - + # Store detailed info for right-hand pane display package_node.data = { "id": vuln_id, "package_name": match["artifact"]["name"], "package_version": match["artifact"]["version"], - "severity": match["vulnerability"].get("severity", "Unknown"), - "fix_version": match["vulnerability"].get("fix", {}).get("versions", ["None"]), - "related": match.get("relatedVulnerabilities", []) + "severity": match["vulnerability"].get( + "severity", "Unknown" + ), + "fix_version": match["vulnerability"] + .get("fix", {}) + .get("versions", ["None"]), + "related": match.get("relatedVulnerabilities", []), } def load_tree_by_severity(self): """Display vulnerabilities organized by severity, in fixed order.""" self.tree_view.clear() # Define the desired order for severities - severity_order = ["Critical", "High", "Medium", "Low", "Negligible", "Unknown"] - + severity_order = [ + "Critical", + "High", + "Medium", + "Low", + "Negligible", + "Unknown", + ] + # Create a dictionary mapping each severity to its matches severity_map = {severity: [] for severity in severity_order} for match in self.vulnerability_report["matches"]: @@ -266,7 +300,7 @@ class Grummage(App): if severity not in severity_map: severity = "Unknown" # Assign unknown severity if it's not one of the predefined categories severity_map[severity].append(match) - + # Add nodes in the specified order with full vulnerability data for each node for severity in severity_order: if severity_map[severity]: # Only add if there are matches @@ -274,20 +308,22 @@ class Grummage(App): for match in severity_map[severity]: vuln_id = match["vulnerability"]["id"] package_name = match["artifact"]["name"] - vuln_node = severity_node.add_leaf(f"{vuln_id} ({package_name})") - + vuln_node = severity_node.add_leaf( + f"{vuln_id} ({package_name})" + ) + # Store detailed info in each node for later access in the right-hand pane vuln_node.data = { "id": vuln_id, "package_name": match["artifact"]["name"], "package_version": match["artifact"]["version"], "severity": severity, - "fix_version": match["vulnerability"].get("fix", {}).get("versions", ["None"]), - "related": match.get("relatedVulnerabilities", []) + "fix_version": match["vulnerability"] + .get("fix", {}) + .get("versions", ["None"]), + "related": match.get("relatedVulnerabilities", []), } - - async def on_key(self, event): """Handle key press events to switch views.""" key = event.key.lower() @@ -304,9 +340,10 @@ class Grummage(App): self.load_tree_by_severity() self.status_bar.update("Status: Viewing by severity.") elif key == "e" and self.selected_vuln_id and self.detailed_text: - self.status_bar.update(f"Status: Explaining {self.selected_vuln_id} in {self.selected_package_name} ({self.selected_package_version})") - await self.explain_vulnerability(self.selected_vuln_id) - + self.status_bar.update( + f"Status: Explaining {self.selected_vuln_id} in {self.selected_package_name} ({self.selected_package_version})" + ) + await self.explain_vulnerability(self.selected_vuln_id) async def on_tree_node_selected(self, event): """Show detailed information for the selected vulnerability.""" @@ -316,7 +353,9 @@ class Grummage(App): self.selected_vuln_id = details["id"] self.selected_package_name = details["package_name"] self.selected_package_version = details["package_version"] - self.status_bar.update(f"Status: Selected {details['id']} in {details['package_name']} ({details['package_version']})") + self.status_bar.update( + f"Status: Selected {details['id']} in {details['package_name']} ({details['package_version']})" + ) detail_text = ( f"# {details['id']}\n\n" f"**Package:** {details['package_name']} ({details['package_version']})\n\n" @@ -326,10 +365,10 @@ class Grummage(App): ) for related in details["related"]: detail_text += f"* {related['id']} ({related['dataSource']})\n" - + # Convert URLs to markdown links detail_text = format_urls_as_markdown(detail_text) - + self.debug_log(f"Displaying details for {details['id']}") self.details_display.update(detail_text) self.detailed_text = detail_text @@ -351,13 +390,17 @@ class Grummage(App): analyze_result = subprocess.run( ["grype", self.sbom_file, "-o", "json"], capture_output=True, - text=True + text=True, ) # Check if the SBOM analysis was successful if analyze_result.returncode != 0: - self.details_display.update(f"Error analyzing SBOM: {analyze_result.stderr}") - self.debug_log(f"Error analyzing SBOM for explanation: {analyze_result.stderr}") + self.details_display.update( + f"Error analyzing SBOM: {analyze_result.stderr}" + ) + self.debug_log( + f"Error analyzing SBOM for explanation: {analyze_result.stderr}" + ) return # Run Grype's explain command with the specific vulnerability ID @@ -365,7 +408,7 @@ class Grummage(App): ["grype", "explain", "--id", vuln_id], input=analyze_result.stdout, # Pass the JSON output from the first run as input capture_output=True, - text=True + text=True, ) # Check and display the result in the details pane @@ -380,16 +423,23 @@ class Grummage(App): self.details_display.update(combined_text) self.debug_log(f"Displaying explanation for {vuln_id}") else: - self.details_display.update(f"# Error\n\nFailed to explain {vuln_id}.\n\nError: {explain_result.stderr}") - self.debug_log(f"Error explaining {vuln_id}: {explain_result.stderr}") - + self.details_display.update( + f"# Error\n\nFailed to explain {vuln_id}.\n\nError: {explain_result.stderr}" + ) + self.debug_log( + f"Error explaining {vuln_id}: {explain_result.stderr}" + ) + except Exception as e: - self.details_display.update(f"# Error\n\nError explaining {vuln_id}: {e}") + self.details_display.update( + f"# Error\n\nError explaining {vuln_id}: {e}" + ) self.debug_log(f"Exception in explain_vulnerability: {e}") if __name__ == "__main__": - sbom_file = sys.argv[1] if len(sys.argv) > 1 else sys.exit() + args = parse_args() + sbom_file = load_sbom(args.sbom_file) if not is_grype_installed(): if prompt_install_grype(): install_grype()