Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e449fc2
Added support for user-configurable LOOKUP_DAYS and updated Time Rang…
omkarj-metron Oct 17, 2025
cafe608
Corrected GitHub link for Azure Function code ZIP folder.
omkarj-metron Oct 17, 2025
5efa5cb
corrected the typo in environment variable
omkarj-metron Oct 17, 2025
6f557c1
Updated the ARM template to deploy Azure Function on Flex Consumption…
omkarj-metron Oct 30, 2025
172ce65
Completed pending state reset for some functions.
omkarj-metron Oct 30, 2025
3c66801
added a logic to store data in azure blob storage.
omkarj-metron Nov 13, 2025
de9b615
token id related bug in utils file is resolved.
omkarj-metron Nov 13, 2025
c10693a
added required role assignements in arm template to automatically dep…
omkarj-metron Nov 13, 2025
e789636
pinned all dependency versions in requirements.txt file in data conne…
omkarj-metron Nov 17, 2025
9e986a2
token related bug is resolved in utils.py file in data connector zip …
omkarj-metron Nov 17, 2025
3db0e24
updated data connector zip file to add batching in api calls.
omkarj-metron Nov 25, 2025
8cbc3c2
Added configurable API page size, batch size, and max retries with dy…
omkarj-metron Nov 25, 2025
fc5cd67
added a dynamic rate limiter in data connector.
omkarj-metron Dec 12, 2025
2500090
Improved error logging in data connector
omkarj-metron Dec 12, 2025
1b25e13
added a missing comma in data connector arm template.
omkarj-metron Dec 15, 2025
1f54059
Refactored rate limiting in BloodhoundManager to use global rate limi…
omkarj-metron Dec 15, 2025
31f3280
Updated rate limiter to enforce minimum time between requests and imp…
omkarj-metron Dec 15, 2025
edf27fb
Refined error handling and retry logic in BloodhoundManager for Azure…
omkarj-metron Dec 16, 2025
88ab33f
Added functionTimeout setting in host.json and enhanced BloodhoundMan…
omkarj-metron Dec 16, 2025
30c0020
Enhanced data connector by implementing Azure Blob Storage for state …
omkarj-metron Dec 29, 2025
4da060d
Merge branch 'master' into bloodhound
omkarj-metron Dec 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import time
import datetime
import json
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from ..utility.utils import load_environment_configs, EnvironmentConfig, AzureConfig
from ..utility.utils import load_environment_configs, EnvironmentConfig, AzureConfig, get_lookup_days, get_azure_batch_size
from ..utility.bloodhound_manager import BloodhoundManager

@dataclass
Expand Down Expand Up @@ -104,12 +105,12 @@ def collect_attack_paths(
bloodhound_manager: BloodhoundManager,
domains: List[Dict[str, Any]],
tenant_domain: str,
last_attack_path_timestamps: Dict[str, Dict[str, str]],
default_lookback_days: int = 1
last_attack_path_timestamps: Dict[str, Dict[str, str]]
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""Collect attack paths for each domain and finding type."""
all_collected_paths = []
domain_latest_timestamps = {}
default_lookback_days = get_lookup_days()

for domain in domains:
domain_id = domain.get("id")
Expand Down Expand Up @@ -153,6 +154,64 @@ def collect_attack_paths(

return all_collected_paths, domain_latest_timestamps

def _prepare_attack_path_log_entry(attack_data: Dict[str, Any], unique_finding_types_data: Dict[str, Any],
tenant_domain: str, domains_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Helper function to prepare a single attack path log entry."""
domain_name = ""
for domain in domains_data:
if domain.get("id") == attack_data.get("DomainSID"):
domain_name = domain.get("name", "")
break

finding_type = attack_data.get("Finding", "")
path_title = unique_finding_types_data.get(finding_type, "")
short_description = unique_finding_types_data.get(f"{finding_type}_short_description", "")
short_remediation = unique_finding_types_data.get(f"{finding_type}_short_remediation", "")
long_remediation = unique_finding_types_data.get(f"{finding_type}_long_remediation", "")

return {
"Accepted": attack_data.get("Accepted", False),
"AcceptedUntil": attack_data.get("AcceptedUntil", ""),
"ComboGraphRelationID": str(attack_data.get("ComboGraphRelationID", "")),
"created_at": attack_data.get("created_at", ""),
"deleted_at": json.dumps(attack_data.get("deleted_at", {})),
"DomainSID": attack_data.get("DomainSID", ""),
"Environment": attack_data.get("Environment", ""),
"ExposureCount": attack_data.get("ExposureCount", 0),
"ExposurePercentage": str(round(float(attack_data.get("ExposurePercentage", "0")) * 100, 2)),
"Finding": attack_data.get("Finding", ""),
"NonTierZeroPrincipalEnvironment": attack_data.get("FromEnvironment", ""),
"NonTierZeroPrincipalEnvironmentID": attack_data.get("FromEnvironmentID", ""),
"NonTierZeroPrincipal": attack_data.get("FromPrincipal", ""),
"NonTierZeroPrincipalKind": attack_data.get("FromPrincipalKind", ""),
"NonTierZeroPrincipalName": attack_data.get("FromPrincipalName", ""),
"NonTierZeroPrincipalProps": json.dumps(attack_data.get("FromPrincipalProps", {})),
"id": int(attack_data.get("id", 0)),
"ImpactCount": attack_data.get("ImpactCount", 0),
"ImpactPercentage": str(round(float(attack_data.get("ImpactPercentage", "0")) * 100, 2)),
"IsInherited": str(attack_data.get("IsInherited", "")),
"Principal": attack_data.get("ToPrincipal", ""),
"PrincipalHash": attack_data.get("PrincipalHash", ""),
"PrincipalKind": attack_data.get("ToPrincipalKind", ""),
"PrincipalName": attack_data.get("ToPrincipalName", ""),
"RelProps": json.dumps(attack_data.get("RelProps", {})),
"Severity": attack_data.get("Severity", ""),
"ImpactedPrincipalEnvironment": attack_data.get("ToEnvironment", attack_data.get("Environment")),
"ImpactedPrincipalEnvironmentID": attack_data.get("ToEnvironmentID", ""),
"ImpactedPrincipal": attack_data.get("ToPrincipal", attack_data.get("Principal")),
"ImpactedPrincipalKind": attack_data.get("ToPrincipalKind", attack_data.get("PrincipalKind")),
"ImpactedPrincipalName": attack_data.get("ToPrincipalName", attack_data.get("PrincipalName")),
"ImpactedPrincipalProps": json.dumps(attack_data.get("ToPrincipalProps", attack_data.get("Props"))),
"updated_at": attack_data.get("updated_at", ""),
"PathTitle": path_title,
"ShortDescription": short_description,
"ShortRemediation": short_remediation,
"LongRemediation": long_remediation,
"tenant_url": tenant_domain,
"domain_name": domain_name,
"Remediation": f"{tenant_domain}ui/remediation?findingType={finding_type}",
}

def send_attack_paths_to_azure_monitor(
attack_paths: List[Dict[str, Any]],
bloodhound_manager: BloodhoundManager,
Expand All @@ -161,34 +220,58 @@ def send_attack_paths_to_azure_monitor(
tenant_domain: str,
domains_data: List[Dict[str, Any]]
) -> Tuple[int, int]:
"""Send collected attack paths to Azure Monitor."""
"""Send collected attack paths to Azure Monitor in batches."""
successful_submissions = 0
failed_submissions = 0
batch_size = get_azure_batch_size()

if not attack_paths:
logging.info("No attack path details to send to Azure Monitor.")
return successful_submissions, failed_submissions

logging.info(f"Sending {len(attack_paths)} collected attack path details to Azure Monitor.")
for i, data_item in enumerate(attack_paths, 1):
result = bloodhound_manager.send_attack_data(
data_item,
logging.info(f"Sending {len(attack_paths)} collected attack path details to Azure Monitor in batches of {batch_size}.")

# Process in batches
for batch_start in range(0, len(attack_paths), batch_size):
batch_end = min(batch_start + batch_size, len(attack_paths))
batch = attack_paths[batch_start:batch_end]

# Prepare log entries for this batch
log_entries = []
for data_item in batch:
try:
log_entry = _prepare_attack_path_log_entry(
data_item, finding_types_data, tenant_domain, domains_data
)
log_entries.append(log_entry)
except Exception as e:
failed_submissions += 1
logging.error(f"Failed to prepare attack path log entry for ID {data_item.get('id')}: {str(e)}")

if not log_entries:
continue

# Send batch to Azure Monitor
logging.info(f"Sending batch {batch_start//batch_size + 1} ({len(log_entries)} entries): IDs {[item.get('id') for item in batch[:5]]}...")
result = bloodhound_manager._send_to_azure_monitor(
log_entries,
azure_monitor_token,
finding_types_data,
tenant_domain,
domains_data
bloodhound_manager.dce_uri,
bloodhound_manager.dcr_immutable_id,
bloodhound_manager.table_name
)
logging.info(f"Processing attack path log entry {i}/{len(attack_paths)}: {data_item.get('id')}")


if result and result.get("status") == "success":
successful_submissions += 1
logging.info(f"Successfully sent attack path for '{data_item.get('id')}'")
entries_sent = result.get("entries_sent", len(log_entries))
successful_submissions += entries_sent
logging.info(f"Successfully sent batch of {entries_sent} attack paths")
else:
failed_submissions += 1
logging.error(f"Failed to send attack path for '{data_item.get('id')}': {result.get('message', 'Unknown error')}")
failed_submissions += len(log_entries)
logging.error(f"Failed to send batch: {result.get('message', 'Unknown error')}")

time.sleep(0.1) # Rate limiting between requests
# Rate limiting is handled automatically by azure_monitor_rate_limiter in _send_to_azure_monitor()

logging.info(f"Attack path sending complete. Successful: {successful_submissions}, Failed: {failed_submissions}")
return successful_submissions, failed_submissions

def process_environment(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
import time
import json
import datetime
from ..utility.utils import load_environment_configs
from ..utility.utils import load_environment_configs, get_azure_batch_size
from ..utility.bloodhound_manager import BloodhoundManager


Expand Down Expand Up @@ -111,14 +110,37 @@ def update_timestamps(domain_entries, current_tenant_domain, domain_name, last_t
logging.info(f"Updated last_attack_path_timeline_timestamps for {current_tenant_domain}/{domain_name} to {latest_timestamp}")


def submit_attack_path_data(bloodhound_manager, attack_data, token, unique_finding_types_data, final_domains):
"""Submit a single attack path data entry to Azure Monitor."""
logging.info(f"Sending attack data: ID {attack_data.get('id')}")
result = bloodhound_manager.send_attack_path_timeline_data(
attack_data, token, unique_finding_types_data, final_domains
)
logging.info(f"Result of sending attack data ID {attack_data.get('id')} is {result}")
return {"id": attack_data.get("id"), "status": "success", "response": result}
def _prepare_attack_path_timeline_log_entry(attack_data: dict, unique_finding_types_data: dict,
tenant_domain: str, domains_data: list) -> dict:
"""Helper function to prepare a single attack path timeline log entry."""
domain_name = ""
# Find the domain name from the domains_data based on DomainSID
for domain in domains_data:
if domain.get("id") == attack_data.get("DomainSID"):
domain_name = domain.get("name", "")
break

finding_type = attack_data.get("Finding", "")
path_title = unique_finding_types_data.get(finding_type, "")

return {
"CompositeRisk": str(round(float(attack_data.get("CompositeRisk")), 2)),
"FindingCount": attack_data.get("FindingCount"),
"ExposureCount": attack_data.get("ExposureCount"),
"ImpactCount": attack_data.get("ImpactCount"),
"ImpactedAssetCount": attack_data.get("ImpactedAssetCount"),
"DomainSID": attack_data.get("DomainSID"),
"Finding": attack_data.get("Finding"),
"id": attack_data.get("id"),
"created_at": attack_data.get("created_at"),
"updated_at": attack_data.get("updated_at"),
"deleted_at": attack_data.get("deleted_at"),
"tenant_url": tenant_domain,
"domain_name": domain_name,
"path_title": path_title,
"finding_type": finding_type,
"TimeGenerated": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds") + "Z",
}


def process_environment(bloodhound_manager, env_config, tenant_domain, last_timestamps):
Expand Down Expand Up @@ -161,17 +183,59 @@ def process_environment(bloodhound_manager, env_config, tenant_domain, last_time
logging.info("No attack path timeline data to send to Azure Monitor.")
return last_timestamps

# Submit data to Azure Monitor
# Submit data to Azure Monitor in batches
token = bloodhound_manager.get_bearer_token()
if not token:
logging.error("Failed to obtain Bearer token for Azure Monitor.")
return last_timestamps

for i, attack in enumerate(consolidated_timeline, 1):
logging.info(f"Processing attack path log entry {i}/{len(consolidated_timeline)}: {attack.get('id')}")
submit_attack_path_data(bloodhound_manager, attack, token, unique_finding_types_data, final_domains)
time.sleep(0.1)
batch_size = get_azure_batch_size()
logging.info(f"Sending {len(consolidated_timeline)} attack path timeline records to Azure Monitor in batches of {batch_size}.")

successful_submissions = 0
failed_submissions = 0

# Process in batches
for batch_start in range(0, len(consolidated_timeline), batch_size):
batch_end = min(batch_start + batch_size, len(consolidated_timeline))
batch = consolidated_timeline[batch_start:batch_end]

# Prepare log entries for this batch
log_entries = []
for attack_data in batch:
try:
log_entry = _prepare_attack_path_timeline_log_entry(
attack_data, unique_finding_types_data, tenant_domain, final_domains
)
log_entries.append(log_entry)
except Exception as e:
failed_submissions += 1
logging.error(f"Failed to prepare attack path timeline log entry for ID {attack_data.get('id')}: {str(e)}")

if not log_entries:
continue

# Send batch to Azure Monitor
logging.info(f"Sending batch {batch_start//batch_size + 1} ({len(log_entries)} entries): IDs {[entry.get('id') for entry in log_entries[:5]]}...")
result = bloodhound_manager._send_to_azure_monitor(
log_entries,
token,
bloodhound_manager.dce_uri,
bloodhound_manager.dcr_immutable_id,
bloodhound_manager.table_name
)

if result and result.get("status") == "success":
entries_sent = result.get("entries_sent", len(log_entries))
successful_submissions += entries_sent
logging.info(f"Successfully sent batch of {entries_sent} attack path timeline records")
else:
failed_submissions += len(log_entries)
logging.error(f"Failed to send batch: {result.get('message', 'Unknown error')}")

# Rate limiting is handled automatically by azure_monitor_rate_limiter in _send_to_azure_monitor()

logging.info(f"Attack path timeline sending complete. Successful: {successful_submissions}, Failed: {failed_submissions}")
return last_timestamps


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Dict, List, Tuple, Optional, Any
import logging
import time
from ..utility.utils import load_environment_configs, EnvironmentConfig, AzureConfig
import json
from ..utility.utils import load_environment_configs, EnvironmentConfig, AzureConfig, get_azure_batch_size
from ..utility.bloodhound_manager import BloodhoundManager

def process_environment(
Expand Down Expand Up @@ -126,7 +127,7 @@ def send_audit_logs_to_azure_monitor(
current_tenant_domain: str
) -> Tuple[int, int]:
"""
Sends audit logs to Azure Monitor.
Sends audit logs to Azure Monitor in batches of 100.

Args:
audit_logs: List of audit log entries to process
Expand All @@ -141,22 +142,53 @@ def send_audit_logs_to_azure_monitor(
Exception: If there's an error sending logs to Azure Monitor
"""
successful_submissions = failed_submissions = 0
logging.info(f"Processing {len(audit_logs)} audit logs for '{current_tenant_domain}'")

for log_entry in audit_logs:
log_id = log_entry.get('id', 'unknown')
logging.info(f"Processing log entry: ID {log_id}")
batch_size = get_azure_batch_size()
logging.info(f"Processing {len(audit_logs)} audit logs for '{current_tenant_domain}' in batches of {batch_size}")

# Process in batches
for batch_start in range(0, len(audit_logs), batch_size):
batch_end = min(batch_start + batch_size, len(audit_logs))
batch = audit_logs[batch_start:batch_end]

# Transform batch entries to the expected schema for Azure Monitor
log_entries = []
for data in batch:
log_entry = {
"action": data.get("action", ""),
"actor_email": data.get("actor_email", ""),
"actor_id": data.get("actor_id", ""),
"actor_name": data.get("actor_name", ""),
"commit_id": data.get("commit_id", ""),
"created_at": data.get("created_at", ""),
"fields": json.dumps(data.get("fields", {})), # fields should be a string in Log Analytics
"id": data.get("id", ""),
"request_id": data.get("request_id", ""),
"source_ip_address": data.get("source_ip_address", ""),
"status": data.get("status", ""),
"tenant_url": current_tenant_domain,
}
log_entries.append(log_entry)

result = bloodhound_manager.send_audit_logs_data(log_entry, azure_monitor_token)
logging.info(f"Sending batch {batch_start//batch_size + 1} ({len(log_entries)} entries): IDs {[log.get('id', 'unknown') for log in log_entries[:5]]}...")

# Send batch to Azure Monitor
result = bloodhound_manager._send_to_azure_monitor(
log_entries,
azure_monitor_token,
bloodhound_manager.dce_uri,
bloodhound_manager.dcr_immutable_id,
bloodhound_manager.table_name
)

if result.get("status") == "success":
successful_submissions += 1
if result and result.get("status") == "success":
entries_sent = result.get("entries_sent", len(log_entries))
successful_submissions += entries_sent
logging.info(f"Successfully sent batch of {entries_sent} audit logs")
else:
failed_submissions += 1
logging.error(f"Failed to send audit log ID {log_id}: {result.get('message', 'Unknown error')}")
failed_submissions += len(log_entries)
logging.error(f"Failed to send batch: {result.get('message', 'Unknown error') if result else 'No response'}")

# Rate limiting to prevent overwhelming the API
time.sleep(0.1)
# Rate limiting is handled automatically by azure_monitor_rate_limiter in _send_to_azure_monitor()

logging.info(
f"Audit log processing for '{current_tenant_domain}' complete. "
Expand Down
Loading
Loading