Skip to content

Commit 31d0617

Browse files
Mikaayensontradebot-elastic
authored andcommitted
[New Rules] Add MITRE ATLAS framework support and GenAI threat detection rules (#5352)
Co-authored-by: Samirbous <[email protected]> Co-authored-by: Ruben Groenewoud <[email protected]> (cherry picked from commit f40a383)
1 parent 9cc35aa commit 31d0617

13 files changed

+8457
-67
lines changed

detection_rules/atlas.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
# or more contributor license agreements. Licensed under the Elastic License
3+
# 2.0; you may not use this file except in compliance with the Elastic License
4+
# 2.0.
5+
6+
"""Mitre ATLAS info."""
7+
8+
from collections import OrderedDict
9+
from pathlib import Path
10+
from typing import Any
11+
12+
import requests
13+
import yaml
14+
from semver import Version
15+
16+
from .utils import cached, clear_caches, get_etc_path
17+
18+
ATLAS_FILE = get_etc_path(["ATLAS.yaml"])
19+
20+
# Maps tactic name to tactic ID (e.g., "Collection" -> "AML.TA0009")
21+
tactics_map: dict[str, str] = {}
22+
technique_lookup: dict[str, dict[str, Any]] = {}
23+
matrix: dict[str, list[str]] = {} # Maps tactic name to list of technique IDs
24+
25+
26+
@cached
27+
def get_atlas_file_path() -> Path:
28+
"""Get the path to the ATLAS YAML file."""
29+
if not ATLAS_FILE.exists():
30+
# Try to download it if it doesn't exist
31+
_ = download_atlas_data()
32+
return ATLAS_FILE
33+
34+
35+
def download_atlas_data(save: bool = True) -> dict[str, Any] | None:
36+
"""Download ATLAS data from MITRE."""
37+
url = "https://raw.githubusercontent.com/mitre-atlas/atlas-data/main/dist/ATLAS.yaml"
38+
r = requests.get(url, timeout=30)
39+
r.raise_for_status()
40+
atlas_data = yaml.safe_load(r.text)
41+
42+
if save:
43+
_ = ATLAS_FILE.write_text(r.text)
44+
print(f"Downloaded ATLAS data to {ATLAS_FILE}")
45+
46+
return atlas_data
47+
48+
49+
@cached
50+
def load_atlas_yaml() -> dict[str, Any]:
51+
"""Load ATLAS data from YAML file."""
52+
atlas_file = get_atlas_file_path()
53+
return yaml.safe_load(atlas_file.read_text())
54+
55+
56+
atlas = load_atlas_yaml()
57+
58+
# Extract version
59+
CURRENT_ATLAS_VERSION = atlas.get("version", "unknown")
60+
61+
# Process the ATLAS matrix
62+
# Look for the specific ATLAS matrix by ID, fall back to first matrix if not found
63+
ATLAS_MATRIX_ID = "ATLAS"
64+
matrix_data = None
65+
66+
if "matrices" in atlas and len(atlas["matrices"]) > 0:
67+
# Try to find the ATLAS matrix by ID
68+
for m in atlas["matrices"]:
69+
if m.get("id") == ATLAS_MATRIX_ID:
70+
matrix_data = m
71+
break
72+
73+
# Fall back to first matrix if ATLAS matrix not found by ID
74+
if matrix_data is None:
75+
matrix_data = atlas["matrices"][0]
76+
77+
if matrix_data is not None:
78+
# Build tactics map
79+
if "tactics" in matrix_data:
80+
for tactic in matrix_data["tactics"]:
81+
tactic_id = tactic["id"]
82+
tactic_name = tactic["name"]
83+
tactics_map[tactic_name] = tactic_id
84+
85+
# Build technique lookup and matrix
86+
if "techniques" in matrix_data:
87+
for technique in matrix_data["techniques"]:
88+
technique_id = technique["id"]
89+
technique_name = technique["name"]
90+
technique_tactics = technique.get("tactics", [])
91+
92+
# Store technique info
93+
technique_lookup[technique_id] = {
94+
"name": technique_name,
95+
"id": technique_id,
96+
"tactics": technique_tactics,
97+
}
98+
99+
# Build matrix: map tactic IDs to technique IDs
100+
for tech_tactic_id in technique_tactics:
101+
# Find tactic name from ID
102+
tech_tactic_name = next((name for name, tid in tactics_map.items() if tid == tech_tactic_id), None)
103+
if tech_tactic_name:
104+
if tech_tactic_name not in matrix:
105+
matrix[tech_tactic_name] = []
106+
if technique_id not in matrix[tech_tactic_name]:
107+
matrix[tech_tactic_name].append(technique_id)
108+
109+
# Sort matrix values
110+
for val in matrix.values():
111+
val.sort(key=lambda tid: technique_lookup.get(tid, {}).get("name", "").lower())
112+
113+
technique_lookup = OrderedDict(sorted(technique_lookup.items()))
114+
techniques = sorted({v["name"] for _, v in technique_lookup.items()})
115+
technique_id_list = [t for t in technique_lookup if "." not in t]
116+
sub_technique_id_list = [t for t in technique_lookup if "." in t]
117+
tactics = list(tactics_map)
118+
119+
120+
def refresh_atlas_data(save: bool = True) -> dict[str, Any] | None:
121+
"""Refresh ATLAS data from MITRE."""
122+
atlas_file = get_atlas_file_path()
123+
current_version_str = CURRENT_ATLAS_VERSION
124+
125+
try:
126+
current_version = Version.parse(current_version_str, optional_minor_and_patch=True)
127+
except (ValueError, TypeError):
128+
# If version parsing fails, download anyway
129+
current_version = Version.parse("0.0.0", optional_minor_and_patch=True)
130+
131+
# Get latest version from GitHub
132+
r = requests.get("https://api.github.com/repos/mitre-atlas/atlas-data/tags", timeout=30)
133+
r.raise_for_status()
134+
releases = r.json()
135+
if not releases:
136+
print("No releases found")
137+
return None
138+
139+
# Find latest version (tags might be like "v5.1.0" or "5.1.0")
140+
latest_release = None
141+
latest_version = current_version
142+
for release in releases:
143+
tag_name = release["name"].lstrip("v")
144+
try:
145+
ver = Version.parse(tag_name, optional_minor_and_patch=True)
146+
if ver > latest_version:
147+
latest_version = ver
148+
latest_release = release
149+
except (ValueError, TypeError):
150+
continue
151+
152+
if latest_release is None:
153+
print(f"No versions newer than the current detected: {current_version_str}")
154+
return None
155+
156+
download = f"https://raw.githubusercontent.com/mitre-atlas/atlas-data/{latest_release['name']}/dist/ATLAS.yaml"
157+
r = requests.get(download, timeout=30)
158+
r.raise_for_status()
159+
atlas_data = yaml.safe_load(r.text)
160+
161+
if save:
162+
_ = atlas_file.write_text(r.text)
163+
print(f"Replaced file: {atlas_file} with version {latest_version}")
164+
165+
# Clear cache to reload
166+
clear_caches()
167+
168+
return atlas_data
169+
170+
171+
def build_threat_map_entry(tactic_name: str, *technique_ids: str) -> dict[str, Any]:
172+
"""Build rule threat map from ATLAS technique IDs."""
173+
url_base = "https://atlas.mitre.org/{type}/{id}/"
174+
tactic_id = tactics_map.get(tactic_name)
175+
if not tactic_id:
176+
raise ValueError(f"Unknown ATLAS tactic: {tactic_name}")
177+
178+
tech_entries: dict[str, Any] = {}
179+
180+
def make_entry(_id: str) -> dict[str, Any]:
181+
tech_info = technique_lookup.get(_id)
182+
if not tech_info:
183+
raise ValueError(f"Unknown ATLAS technique ID: {_id}")
184+
return {
185+
"id": _id,
186+
"name": tech_info["name"],
187+
"reference": url_base.format(type="techniques", id=_id.replace(".", "/")),
188+
}
189+
190+
for tid in technique_ids:
191+
if tid not in technique_lookup:
192+
raise ValueError(f"Unknown ATLAS technique ID: {tid}")
193+
194+
tech_info = technique_lookup[tid]
195+
tech_tactic_ids = tech_info.get("tactics", [])
196+
if tactic_id not in tech_tactic_ids:
197+
raise ValueError(f"ATLAS technique ID: {tid} does not fall under tactic: {tactic_name}")
198+
199+
# Handle sub-techniques (e.g., AML.T0000.000)
200+
if "." in tid and tid.count(".") > 1:
201+
# This is a sub-technique
202+
parts = tid.rsplit(".", 1)
203+
parent_technique = parts[0]
204+
tech_entries.setdefault(parent_technique, make_entry(parent_technique))
205+
tech_entries[parent_technique].setdefault("subtechnique", []).append(make_entry(tid))
206+
else:
207+
tech_entries.setdefault(tid, make_entry(tid))
208+
209+
entry: dict[str, Any] = {
210+
"framework": "MITRE ATLAS",
211+
"tactic": {
212+
"id": tactic_id,
213+
"name": tactic_name,
214+
"reference": url_base.format(type="tactics", id=tactic_id),
215+
},
216+
}
217+
218+
if tech_entries:
219+
entry["technique"] = sorted(tech_entries.values(), key=lambda x: x["id"])
220+
221+
return entry

0 commit comments

Comments
 (0)