Skip to content

Added local run options #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ jobs:
security/detect-non-literal-regexp,
security/detect-object-injection

# Log forwarding
# Log output
sumo_logic_enabled: true
sumo_logic_http_source_url: https://example/url
ms_sentinel_enabled: true
ms_sentinel_workspace_id: REPLACE_ME
ms_sentinel_shared_key: REPLACE_ME

```
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mdutils~=1.6.0
PyGithub~=2.4.0
PyGithub~=2.4.0
requests~=2.32.3
tabulate~=0.9.0
42 changes: 42 additions & 0 deletions src/core/connectors/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,52 @@ def __init__(self, **kwargs):
class TrivyImageTestResult(BaseTestResult):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.target = kwargs.get("Target", "")
self.Class = kwargs.get("Class", "")
self.Type = kwargs.get("Type", "")
self.description = kwargs.get("Description", "")
self.title = kwargs.get("Title", "")
self.severity = kwargs.get("Severity", "UNKNOWN")
self.package = kwargs.get("Package", "Unknown")
self.file = kwargs.get("File", "Unknown")
self.url = kwargs.get("URL", "")
self.issue_text = kwargs.get("IssueText", "")

# CVSS
self.cvss_nvd_v2_score = kwargs.get("CVSS", {}).get("nvd", {}).get("V2Score", None)
self.cvss_nvd_v2_vector = kwargs.get("CVSS", {}).get("nvd", {}).get("V2Vector", "")
self.cvss_nvd_v3_score = kwargs.get("CVSS", {}).get("nvd", {}).get("V3Score", None)
self.cvss_nvd_v3_vector = kwargs.get("CVSS", {}).get("nvd", {}).get("V3Vector", "")

# CWE and IDs
self.cwe_ids = kwargs.get("CweIDs", [])
self.vendor_ids = kwargs.get("VendorIDs", [])
self.vulnerability_id = kwargs.get("VulnerabilityID", "")
self.status = kwargs.get("Status", "")
self.severity_source = kwargs.get("SeveritySource", "")
self.vendor_severity = kwargs.get("VendorSeverity", {})

# Package data
self.pkg_id = kwargs.get("PkgID", "")
self.pkg_name = kwargs.get("PkgName", "")
self.pkg_identifier = kwargs.get("PkgIdentifier", {})
self.installed_version = kwargs.get("InstalledVersion", "")
self.fixed_version = kwargs.get("FixedVersion", "")

# Dates
self.published_date = kwargs.get("PublishedDate", "")
self.last_modified_date = kwargs.get("LastModifiedDate", "")

# References
self.references = kwargs.get("References", [])
self.primary_url = kwargs.get("PrimaryURL", "")

# Layer metadata
self.layer_digest = kwargs.get("Layer", {}).get("Digest", "")
self.layer_diff_id = kwargs.get("Layer", {}).get("DiffID", "")

# Data source
data_source = kwargs.get("DataSource", {})
self.data_source_id = data_source.get("ID", "")
self.data_source_name = data_source.get("Name", "")
self.data_source_url = data_source.get("URL", "")
8 changes: 7 additions & 1 deletion src/core/connectors/trivy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def process_output(cls, data: dict, cwd: str, plugin_name: str = "Trivy Image")
continue # No vulnerabilities, so no events
item_type = result.get("Type", "Unknown")
item_class = result.get("Class", "Unknown")
item_target = result.get("Target", "Unknown")
item_key = f"{item_class}_{item_type}"
vuln_results = {}
if item_key not in grouped_vulnerabilities:
Expand All @@ -43,6 +44,11 @@ def process_output(cls, data: dict, cwd: str, plugin_name: str = "Trivy Image")
]
else:
vuln_results[package][severity].append(cve_title)
output_result = cls.result_class(**vuln)
output_result.Class = item_class
output_result.Type = item_type
output_result.Target = item_target
metrics["output"].append(output_result)
grouped_vulnerabilities[item_key].update(vuln_results)
results = []
for description in grouped_vulnerabilities:
Expand All @@ -68,7 +74,7 @@ def process_output(cls, data: dict, cwd: str, plugin_name: str = "Trivy Image")
test_result.plugin_name = plugin_name
cls.extract_additional_data(test_result, cwd)
metrics["events"].append(test_result)
metrics["output"].append(test_result)
# metrics["output"].append(test_result)
return metrics

@staticmethod
Expand Down
18 changes: 17 additions & 1 deletion src/core/load_plugins.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from core.plugins.sumologic import Sumologic
from core.plugins.microsoft_sentinel import Sentinel
from core.plugins.console import Console


def load_sumo_logic_plugin():
Expand Down Expand Up @@ -40,4 +41,19 @@ def load_ms_sentinel_plugin():
print("Microsoft Sentinel environment variables are not properly configured!")
return None

return Sentinel(MS_SENTINEL_WORKSPACE_ID, MS_SENTINEL_SHARED_KEY)
return Sentinel(MS_SENTINEL_WORKSPACE_ID, MS_SENTINEL_SHARED_KEY)

def load_console_plugin():
"""
Loads the Microsoft Sentinel plugin if it is enabled and properly configured.

:return: Instance of the Microsoft Sentinel class or None if not enabled/configured.
"""
console_enabled = os.getenv("INPUT_CONSOLE_ENABLED", "false").lower() == "true"
if not console_enabled:
print("Console Output integration is disabled.")
return None

SOCKET_CONSOLE_MODE = os.getenv("INPUT_SOCKET_CONSOLE_MODE", "console").lower()

return Console(mode=SOCKET_CONSOLE_MODE)
1 change: 1 addition & 0 deletions src/core/plugins/console/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .console import Console
186 changes: 186 additions & 0 deletions src/core/plugins/console/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import json
from datetime import datetime, timezone
from tabulate import tabulate
from core import log


default_log_type = 'SocketSecurityTool'


class BaseEvent:
def __init__(self, **kwargs):
self.Severity = kwargs.get("Severity", "Unknown")
self.issue_text = kwargs.get("issue_text", "Unknown")
self.test_name = kwargs.get("test_name", "Unknown")
self.more_info = kwargs.get("more_info", None)
self.Message = kwargs.get("Message", "Unknown")
self.filename = kwargs.get("filename", "Unknown")
self.URL = kwargs.get("URL", "N/A")
self.Timestamp = kwargs.get("Timestamp", datetime.now(timezone.utc).isoformat())
self.Plugin = kwargs.get("Plugin", "Unknown")

def as_dict(self):
return dict(self.__dict__)

def as_row(self):
return list(self.as_dict().values())

def render(self, output_type='console'):
row = self.as_row()
if output_type == 'markdown':
return "| " + " | ".join(f"`{str(v)}`" for v in row) + " |"
elif output_type == 'json':
return self.as_dict()
else:
return " | ".join(str(v) for v in row)


class TrufflehogEvent(BaseEvent):
def __init__(self, event: dict):
super().__init__(
issue_text=event.get("DetectorName", "Unknown Detection"),
test_name=event.get("SourceName", "Secret Scanning"),
more_info=event.get("ExtraData", {}).get("rotation_guide", "No remediation guide available"),
Message=event.get("Raw", "Potential secret detected"),
FilePath=event.get("file", None),
Timestamp=event.get("timestamp", datetime.now(timezone.utc).isoformat()),
Plugin="Trufflehog",
Severity="HIGH" if not event.get("Verified", False) else "LOW"
)
self.SourceType = event.get("SourceType", "Unknown")
self.DetectorType = event.get("DetectorType", "Unknown")



class BanditEvent(BaseEvent):
def __init__(self, event: dict):
super().__init__(
issue_text=event.get("issue_text", "Unknown"),
test_name=event.get("test_name", "Static Analysis"),
more_info=event.get("more_info", "No remediation guide available"),
Message=event.get("issue_text", "Unknown issue"),
FilePath=event.get("filename", "Unknown"),
Timestamp=event.get("timestamp", datetime.now(timezone.utc).isoformat()),
Plugin="Bandit",
Severity=event.get("issue_severity", "Unknown")
)
self.test_id = event.get("test_id", "Unknown")
self.code = event.get("code", None)
self.line_number = event.get("line_number", None)
self.line_range = event.get("line_range", None)


class GosecEvent(BaseEvent):
def __init__(self, event: dict):
super().__init__(
issue_text=event.get("issue_text", "Unknown"),
test_name=event.get("rule_id", "Unknown"),
more_info=event.get("cwe", {}).get("url", "No remediation guide available"),
Message=event.get("details", "Unknown"),
FilePath=event.get("file", "Unknown"),
URL=event.get("url", "N/A"),
Timestamp=event.get("timestamp", datetime.now(timezone.utc).isoformat()),
Plugin="Gosec",
Severity=event.get("severity", "Unknown")
)
self.CWE_ID = event.get("cwe", {}).get("id", "Unknown")
self.code = event.get("code", None)
self.line_number = event.get("line", None)


class ESLintEvent(BaseEvent):
def __init__(self, event: dict):
super().__init__(
issue_text=event.get("issue_text", "Unknown File"),
test_name=event.get("rule_id"),
more_info="Review ESLint rules and fix reported issues",
Message=f"ESLint detected issues in {event.get('file', 'Unknown')}.",
FilePath=event.get("file_path", "Unknown"),
Timestamp=event.get("timestamp", datetime.now(timezone.utc).isoformat()),
Plugin="ESLint",
Severity=event.get("severity", "LOW")
)
self.messages = event.get("messages", [])


class TrivyEvent(BaseEvent):
def __init__(self, event: dict):
super().__init__(
issue_text=event.get("Title", ""),
test_name=event.get("Class", ""),
more_info=event.get("PrimaryURL", ""),
Message=f"Trivy scan detected issues in {event.get('PkgID', 'Unknown')}.",
FilePath=event.get("Target", "Unknown"),
Timestamp=event.get("PublishedDate", datetime.now(timezone.utc).isoformat()),
Plugin="Trivy",
Severity=event.get("severity", "LOW")
)
self.CweIDs = event.get("CweIDs", [])


class Console:
def __init__(self, mode: str = 'console'):
"""
Initializes the Console client with credentials and HTTP source URL.

:param mode:
"""

self.mode = mode

@staticmethod
def normalize_events(raw_events: list, plugin: str) -> list:
events = []
for event in raw_events:
if plugin == 'bandit':
events.append(BanditEvent(event.__dict__))
elif plugin == 'trufflehog':
events.append(TrufflehogEvent(event.__dict__))
elif plugin == 'gosec':
events.append(GosecEvent(event.__dict__))
elif plugin == 'eslint':
events.append(ESLintEvent(event.__dict__))
elif 'trivy' in plugin:
events.append(TrivyEvent(event.__dict__))
else:
print(f"Unknown event type {plugin}")
return events

def print_events(self, events: list, plugin: str) -> None:
"""
Processes events and outputs them based on the selected output type.

:param events: List of event objects (subclasses of BaseEvent)
:param plugin: Optional log type string
:param output_type: 'console', 'markdown', or 'json'
:return: Formatted string (markdown/console) or JSON array (str)
"""
msg = f"No events to process for {plugin} plugin. Skipping output."
if not events or len(events) == 0:
print(msg)
return

normalized = Console.normalize_events(events, plugin)

if len(normalized) == 0:
print(msg)
return
print(f"{plugin} issues detected:")
if self.mode == 'json':
json_output = [event.render('json') for event in normalized]
print(json.dumps(json_output, indent=2))
return

headers = list(normalized[0].as_dict().keys())
rows = [event.as_row() for event in normalized]

if self.mode == 'markdown':
header_line = "| " + " | ".join(f"`{h}`" for h in headers) + " |"
divider_line = "| " + " | ".join("---" for _ in headers) + " |"
body_lines = ["| " + " | ".join(f"`{v}`" for v in row) + " |" for row in rows]
print("\n".join([header_line, divider_line] + body_lines))
return

# Default to console table
print(tabulate(rows, headers=headers, tablefmt="grid"))

Loading