Skip to content

Latest commit

 

History

History
690 lines (568 loc) · 37 KB

File metadata and controls

690 lines (568 loc) · 37 KB

PRD: EvidenceForge Canonical Event Model

Status: ✅ COMPLETE (Phase 7, implemented 2026-03-19) Addendum to: docs/PRD.md (main EvidenceForge PRD) Scope: Architectural refactor of the generation engine to use a canonical event model for cross-log consistency by construction. Results: All 12 generate_* methods migrated. A/B eval: 82.3→83.7 (+1.4). Expert panel: 6 tells fixed, 0 regressions. 2026-04 Addendum: Phase 8.5 added WorldModel / WorldPlanner above ActivityGenerator. The canonical event model and dispatcher remain the rendering backbone, but planner-owned session bootstrap now means session IDs may be allocated before ActivityGenerator.generate_logon() emits the corresponding host/network evidence. 2026-05 Addendum: Source-native render timestamps are now planned by SourceTimingPlanner. SecurityEvent.timestamp remains canonical world time; emitters use planned source times with explicit causal bounds for migrated timing surfaces. 2026-05 Architecture Reset Addendum: Action bundles now sit above SecurityEvent for multi-phase activities. A bundle represents one real-world activity that may produce multiple canonical events; each SecurityEvent remains one logical evidence-producing occurrence with contexts for facets of that occurrence.

1. Overview

Problem

EvidenceForge currently generates multi-format log events through procedural coordination — each generate_* method in ActivityGenerator manually emits to multiple emitters, passing shared fields (timestamps, LogonIDs, PIDs, UIDs, ports) as individual parameters. Consistency between formats relies on the developer remembering to pass the right values to every emitter for every activity type.

This approach has a ceiling. The improvement loop identified multiple P0/P1 consistency bugs:

  • SSH activity in syslog with no corresponding Zeek port 22 traffic
  • Missing DNS queries before exfiltration connections
  • Wrong source ports between DNS and TCP connection logs
  • Non-monotonic kernel uptime in syslog

Each new log format or activity type adds more places for these inconsistencies to appear. The bug surface grows multiplicatively (activity types x log formats).

Solution

Introduce a canonical event model — an intermediate representation layer between activity generation and log rendering. Instead of ActivityGenerator calling each emitter separately with manually-coordinated fields, it builds a single, rich SecurityEvent object that carries all shared metadata. An EventDispatcher then routes the event to StateManager (for state bookkeeping) and to relevant emitters (for rendering). Each emitter renders its format-specific view of the same object.

Core principle: consistency by construction, not by coordination. Two emitters cannot disagree about a port number because there is only one port number — on the event object.

Architecture: Two-Phase Build + Dispatcher

WorldPlanner / ActivityGenerator
  1. Allocates IDs from StateManager  -> StateManager returns logon_id / pid / zeek_uid
  2. Builds SecurityEvent with all    -> Complete event object (all IDs populated)
     contexts and allocated IDs
  3. Dispatches event                 -> EventDispatcher
                                          |-> StateManager.apply(event)  [record session/process/connection]
                                          |-> for emitter in matching:
                                                emitter.emit(event)      [format rendering]

Why two-phase: SecurityEvent fields like logon_id, pid, and zeek_uid are generated by StateManager. The event must be fully constructed before dispatch, so the responsible planning/generation layer allocates these IDs first (WorldPlanner for planner-owned session bootstrap, ActivityGenerator for process/connection allocation and direct event paths), then ActivityGenerator builds the complete event, then dispatches. StateManager.apply() records the already-allocated state (session, process, connection) — it does not allocate new IDs.

2. Goals & Non-Goals

Goals

  • Eliminate cross-format consistency bugs structurally (no field can be generated twice with different values)
  • Migrate all ~14 existing activity types to the event model
  • Support a direct-emission escape hatch (RawLogEntry) for rare single-format log entries (anonymous logon, kernel messages only)
  • Maintain or improve eval framework scores (same scenario, same or better quality)
  • Make adding new log formats cheaper (write a renderer, not emission logic across every activity method)
  • Integrate with existing network visibility model (NetworkVisibilityEngine) in dispatcher
  • Zero regression in all existing tests
  • Negligible memory overhead (< 2 KB per transient event, immediately GC'd)

Non-Goals

  • Deep domain ontology (no modeling of TLS handshakes, Kerberos ticket flows, etc. — keep it flat + composable)
  • Persisting event objects beyond emission (events are transient, StateManager owns durable state)
  • Changing the scenario YAML schema (this is an internal engine refactor)
  • Changing the output log formats (same files, same content, same structure)
  • Replacing StateManager (it continues to own sessions/processes/connections — event model feeds into it)
  • Removing Jinja2 templates (templates remain for final string rendering; only the dict-building moves into emitter render methods)

3. Data Model

3.1 Event Contexts (Composable Building Blocks)

All contexts are Python @dataclass(slots=True) for memory efficiency. slots=True prevents adding dynamic attributes but does NOT make instances immutable — fields remain mutable to support two-phase construction where IDs are allocated before the full event is built. Fields are populated by ActivityGenerator; emitters and StateManager read from them.

@dataclass(slots=True)
class HostContext:
    """The system where this event occurs."""
    hostname: str
    ip: str
    os: str                          # e.g., "Windows Server 2019", "Ubuntu 22.04"
    os_category: str                 # "windows" | "linux"
    system_type: str                 # "workstation" | "server" | "domain_controller"
    domain: str = ""                 # AD domain name, if applicable

@dataclass(slots=True)
class AuthContext:
    """Authentication/session details."""
    username: str
    full_name: str = ""
    user_sid: str = ""               # Windows SID (S-1-5-21-...)
    logon_id: str = ""               # Hex logon ID (0x3e7) -- allocated by StateManager.create_session()
    logon_type: int = 2              # Windows logon type (2=Interactive, 3=Network, etc.)
    auth_package: str = "Negotiate"  # Negotiate, NTLM, Kerberos
    result: str = "success"          # "success" | "failure"
    failure_reason: str = ""         # For failed logons (e.g., "0xC000006D")
    source_ip: str = ""              # Where the logon came from
    source_port: int = 0
    elevated: bool = False           # Special privileges (4672)

@dataclass(slots=True)
class ProcessContext:
    """Process creation/termination details."""
    pid: int                         # Allocated by StateManager.create_process()
    parent_pid: int
    image: str                       # Full path (e.g., "C:\\Windows\\System32\\cmd.exe")
    command_line: str
    username: str
    integrity_level: str = "Medium"  # Low, Medium, High, System
    start_time: datetime | None = None  # Creation time for stable process GUIDs

@dataclass(slots=True)
class RemoteThreadContext:
    """Remote thread creation details shared by Sysmon Event 8 and eCAR."""
    target_pid: int
    target_image: str
    new_thread_id: int
    start_address: int
    start_module: str = ""
    start_function: str = ""

@dataclass(slots=True)
class NetworkContext:
    """Network connection details -- shared across Zeek, eCAR, Snort."""
    src_ip: str
    src_port: int
    dst_ip: str
    dst_port: int
    protocol: str                    # "tcp" | "udp" | "icmp"
    service: str = ""                # "http" | "https" | "dns" | "ssh" | etc.
    zeek_uid: str = ""               # Zeek connection UID (C-prefix, 18 chars) -- from StateManager.open_connection()
    conn_id: str = ""                # Internal connection ID from StateManager.open_connection()
    duration: float = 0.0
    orig_bytes: int = 0
    resp_bytes: int = 0
    orig_pkts: int = 0
    resp_pkts: int = 0
    conn_state: str = ""             # Zeek conn_state (SF, S0, REJ, etc.)
    history: str = ""                # Zeek history string (ShADadfF, etc.)
    local_orig: bool = True
    local_resp: bool = False

@dataclass(slots=True)
class DnsContext:
    """DNS query/response details."""
    query: str                       # Domain queried
    query_type: str = "A"            # A, AAAA, CNAME, TXT, etc.
    response_ip: str = ""            # Resolved IP
    rcode: str = "NOERROR"           # DNS response code

@dataclass(slots=True)
class FileContext:
    """File operation details."""
    path: str
    action: str                      # "create" | "modify" | "delete" | "read"
    pid: int = 0                     # Process performing the operation

@dataclass(slots=True)
class RegistryContext:
    """Windows registry operation details."""
    key: str
    value: str = ""
    action: str = ""                 # "create" | "modify" | "delete"
    pid: int = 0

@dataclass(slots=True)
class IdsContext:
    """IDS/IPS alert details for Snort."""
    sid: int                         # Snort rule SID
    message: str                     # Alert message
    classification: str              # Alert classification
    priority: int = 2                # 1=high, 2=medium, 3=low

@dataclass(slots=True)
class SyslogContext:
    """Syslog message fields for Linux system/daemon/kernel logs.
    Callers provide the exact app_name, message, facility, and severity.
    The syslog emitter renders directly from this context."""
    app_name: str                    # "sshd", "kernel", "systemd", "snapd", etc.
    message: str                     # The syslog message body
    pid: int | None = None           # None for kernel messages
    facility: int = 3                # 3=daemon, 0=kernel, 10=auth/security
    severity: int = 6                # 6=info, 5=notice, 4=warning

3.2 Security Event (Base + Escape Hatch)

@dataclass(slots=True)
class SecurityEvent:
    """Canonical event -- carries all shared metadata for a single logical event."""
    timestamp: datetime
    event_type: str                  # Canonical type name (see table below)

    # Composable contexts -- populated as needed
    host: HostContext | None = None
    auth: AuthContext | None = None
    process: ProcessContext | None = None
    network: NetworkContext | None = None
    dns: DnsContext | None = None
    file: FileContext | None = None
    registry: RegistryContext | None = None
    ids: IdsContext | None = None
    source_timing: SourceTimingPlan | None = None  # Planned source-native timestamps

@dataclass(slots=True)
class RawLogEntry:
    """Escape hatch -- bypass the event model for user-defined raw events.

    Used solely by the `raw` event type in scenario YAML, allowing users
    to emit arbitrary fields to a specific log format. All internal engine
    code uses canonical SecurityEvent dispatch exclusively.
    """
    timestamp: datetime
    target_emitter: str              # Emitter dict key (e.g., "syslog", "zeek_conn", "windows_event_security")
    data: dict[str, Any]             # Raw field dict, passed directly to emitter's emit_raw()

SourceTimingPlan is internal render metadata. It records the canonical timestamp and deterministic source timestamps keyed by source profile/seed. Emitters must treat the canonical timestamp as the truth of when the activity happened and use source times only for source-native observation/rendering. The ordering guarantee is per source stream: declared causal edges render in order using deterministic epsilon spacing, while unrelated events are not forced into a global total order and may share equal timestamps.

3.3 Event Type Catalog

Each event_type string maps to which contexts must/may be populated. "required" = must be non-None. "optional" = may be populated for richer output. "-" = not applicable.

event_type host auth process network dns file registry ids Description
logon required required - - - - - - User authentication (4624, syslog auth, eCAR)
logon_failed required required - - - - - - Failed authentication (4625)
logoff required required - - - - - - Session end (4634, syslog, eCAR)
process_create required - required - - - - - Process creation (4688, syslog, eCAR)
process_terminate required - required - - - - - Process termination (4689)
system_process_create required - required - - - - - OS boot process tree (svchost chains, systemd)
connection - - - required optional - - optional Network connection (Zeek conn, eCAR FLOW, Snort)
dns_query - - - required required - - - DNS lookup (Zeek dns)
bash_command required required - - - - - - Bash history entry
file_operation required - - - - required - - File create/modify/delete (eCAR FILE)
registry_operation required - - - - - required - Registry modification (eCAR REGISTRY)
image_load required - optional - - - - - Module/DLL load (Sysmon Event 7, eCAR MODULE/LOAD)
machine_logon required required - - - - - - Machine account auth (4624 Type 3)
kerberos_tgt required required - - - - - - Kerberos TGT request (4768)
kerberos_service required required - - - - - - Kerberos service ticket (4769)
ntlm_validation required required - - - - - - NTLM authentication (4776)
web_request - - - required - - - - Web access log entry

Notes:

  • connection and dns_query have no host context because the current generate_connection() works purely with IP addresses, not host objects. This is preserved as-is (not new behavior).
  • system_process_create uses the same contexts as process_create but is a distinct type so emitters can apply different logic (e.g., no syslog emission for Windows boot processes).
  • image_load carries canonical DLL/module-load data shared by Sysmon Event 7 and eCAR MODULE/LOAD. process is optional (module loads can be attributed to a process or standalone). module_load remains accepted by eCAR as a legacy compatibility alias.

4. Dispatcher Design

4.1 EventDispatcher

class EventDispatcher:
    """Routes SecurityEvents to StateManager and emitters.

    Two-layer filtering for emitter selection:
    1. Format eligibility: emitter.can_handle(event) -- does this emitter support this event type?
    2. Network visibility: for network events, check NetworkVisibilityEngine to see if
       the connection is visible to sensors that produce this emitter's format.
    """

    def __init__(
        self,
        state_manager: StateManager,
        emitters: dict[str, LogEmitter],
        visibility_engine: NetworkVisibilityEngine | None = None,
    ):
        self.state_manager = state_manager
        self.emitters = emitters
        self.visibility_engine = visibility_engine

    def dispatch(self, event: SecurityEvent) -> None:
        """Route a structured event to StateManager + matching emitters."""
        self.state_manager.apply(event)
        for emitter in self._get_matching_emitters(event):
            event = self.source_timing_planner.plan_event(event)
            emitter.emit(event)

    def dispatch_raw(self, entry: RawLogEntry) -> None:
        """Route a raw log entry directly to a specific emitter (escape hatch).

        target_emitter must match a key in self.emitters dict
        (e.g., "syslog", "zeek_conn", "windows_event_security").
        """
        emitter = self.emitters[entry.target_emitter]
        emitter.emit_raw(entry.data)

    def _get_matching_emitters(self, event: SecurityEvent) -> list[LogEmitter]:
        """Two-layer filtering: format eligibility + network visibility."""
        # For network events, determine which formats can see this traffic
        visible_formats: set[str] | None = None
        if event.network and self.visibility_engine:
            visible_formats = self.visibility_engine.get_log_formats_for_connection(
                event.network.src_ip, event.network.dst_ip
            )

        matched = []
        for format_name, emitter in self.emitters.items():
            if not emitter.can_handle(event):
                continue
            # Network visibility filter: only applies to network-aware event types
            if visible_formats is not None and format_name in _NETWORK_FORMATS:
                if format_name not in visible_formats:
                    continue
            matched.append(emitter)
        return matched

# Formats subject to network visibility filtering
_NETWORK_FORMATS = {"zeek_conn", "zeek_dns", "snort_alert"}

4.2 StateManager Changes

New method: apply(event) -- records already-allocated state from a SecurityEvent. Does NOT allocate IDs (those are allocated in the two-phase build before dispatch).

def apply(self, event: SecurityEvent) -> None:
    """Record state from a fully-constructed SecurityEvent.

    IDs (logon_id, pid, conn_id, zeek_uid) are already allocated by the caller
    via create_session(), create_process(), open_connection() before building
    the SecurityEvent. This method records the session/process/connection in
    the state tracking structures using those pre-allocated IDs.
    """
    if event.event_type == "logon" and event.auth:
        # Session already created via create_session() during two-phase build.
        # apply() is a no-op for logon -- state was recorded at ID allocation time.
        pass
    elif event.event_type == "logoff" and event.auth:
        self.end_session(event.auth.logon_id)
    elif event.event_type == "process_terminate" and event.process:
        self.end_process(event.host.hostname, event.process.pid)
    elif event.event_type == "connection" and event.network:
        # Connection already opened via open_connection() during two-phase build.
        # apply() updates bytes if provided.
        if event.network.conn_id and (event.network.orig_bytes or event.network.resp_bytes):
            self.update_connection_bytes(
                event.network.conn_id,
                event.network.orig_bytes,
                event.network.resp_bytes,
            )
    # logon, process_create, system_process_create, connection: state already
    # recorded during the ID allocation phase. apply() handles only teardown
    # and updates.

Existing methods preserved: create_session(), create_process(), open_connection(), end_session(), end_process(), close_connection(), etc. all remain unchanged. They are called by ActivityGenerator during the ID allocation phase (step 1 of two-phase build).

4.3 Emitter Changes

Base class gains new abstract methods alongside the existing interface:

class LogEmitter(ABC):
    # === New interface (SecurityEvent-based) ===

    @abstractmethod
    def can_handle(self, event: SecurityEvent) -> bool:
        """Return True if this emitter can render this event type."""
        ...

    @abstractmethod
    def emit(self, event: SecurityEvent) -> None:
        """Render this event to the emitter's format.

        Implementations build a field dict from SecurityEvent contexts,
        then pass it to the existing Jinja2 template for final string rendering.
        """
        ...

    # === Escape hatch (raw dict path) ===

    def emit_raw(self, event_data: dict[str, Any]) -> None:
        """Emit from raw dict -- used by RawLogEntry escape hatch.

        This is the current emit_event() logic, renamed.
        """
        # Delegates to existing _render_event() + buffer pipeline
        ...

    # === Existing methods (unchanged) ===
    # _render_event(), flush(), barrier_flush(), close(), etc.

Each emitter defines _supported_types as a class-level constant:

Emitter _supported_types
WindowsEventEmitter {logon, logon_failed, logoff, process_create, process_terminate, system_process_create, machine_logon, kerberos_tgt, kerberos_service, ntlm_validation}
SysmonEventEmitter {process_create, system_process_create, process_terminate, create_remote_thread, process_access}
SyslogEmitter {logon, logon_failed, logoff, process_create, bash_command}
ZeekEmitter (conn) {connection}
ZeekDnsEmitter {dns_query}
EcarEmitter {logon, logoff, process_create, process_terminate, system_process_create, ssh_session, connection, file_create, file_modify, file_delete, registry_modify, image_load, module_load, create_remote_thread, process_access, service_installed}
SnortEmitter {connection} (only when event.ids is populated)
BashHistoryEmitter {bash_command}
WebEmitter {web_request}

can_handle() pattern (common to all emitters):

def can_handle(self, event: SecurityEvent) -> bool:
    if event.event_type not in self._supported_types:
        return False
    # OS-specific emitters add additional checks:
    # WindowsEventEmitter: event.host and event.host.os_category == "windows"
    # SyslogEmitter: event.host and event.host.os_category == "linux"
    # BashHistoryEmitter: event.host and event.host.os_category == "linux"
    # Network emitters (Zeek, Snort): no OS check (work with IPs)
    return True

Rendering flow (Jinja2 templates preserved):

Each emitter's emit() method:

  1. Builds a field dict from SecurityEvent contexts (explicit render method per event type)
  2. Passes the dict to the existing self._template.render(**dict) pipeline for final string formatting
  3. Buffers the rendered string (or raw dict for WindowsEventEmitter's deferred rendering)
# Example: WindowsEventEmitter
class WindowsEventEmitter(LogEmitter):
    _supported_types = {
        "logon", "logon_failed", "logoff", "process_create",
        "process_terminate", "system_process_create", "machine_logon",
        "kerberos_tgt", "kerberos_service", "ntlm_validation",
    }

    def can_handle(self, event: SecurityEvent) -> bool:
        return (
            event.event_type in self._supported_types
            and event.host is not None
            and event.host.os_category == "windows"
        )

    def emit(self, event: SecurityEvent) -> None:
        if event.event_type == "logon":
            self._render_logon(event)
        elif event.event_type == "process_create":
            self._render_process_create(event)
        # ... dispatch to per-type render method

    def _render_logon(self, event: SecurityEvent) -> None:
        """Build Windows 4624 dict from SecurityEvent, buffer for deferred rendering."""
        event_data = {
            "EventID": 4624,
            "TimeCreated": event.timestamp,
            "Computer": event.host.hostname,
            "TargetUserName": event.auth.username,
            "TargetUserSid": event.auth.user_sid,
            "TargetLogonId": event.auth.logon_id,
            "LogonType": event.auth.logon_type,
            "IpAddress": event.auth.source_ip or "-",
            # ... all fields from single source (event object)
        }
        # Buffer raw dict for deferred chronological sorting + RecordID assignment
        self._event_dicts.append(event_data)

WindowsEventEmitter special handling preserved: Continues to buffer raw dicts in _event_dicts, sort chronologically at flush time, assign per-computer monotonic EventRecordIDs, then render to XML via template. The _flush_unlocked() override is unchanged.

BashHistoryEmitter special handling preserved: Continues to multiplex per-user-per-host via _SingleHistoryWriter instances.

4.4 ActivityGenerator Changes

Constructor change: Accept dispatcher: EventDispatcher instead of separate state_manager + emitters:

class ActivityGenerator:
    def __init__(
        self,
        dispatcher: EventDispatcher,
        scenario: Scenario,
        # ... other existing params
    ):
        self.dispatcher = dispatcher
        self.state_manager = dispatcher.state_manager  # convenience reference
        self.emitters = dispatcher.emitters              # convenience reference (used during migration)
        # ... rest unchanged

Migrated method pattern (example: generate_logon()):

def generate_logon(self, user, system, time, logon_type=2, source_ip=None):
    # Phase 1: Allocate IDs from StateManager (existing call, unchanged)
    logon_id = self.state_manager.create_session(
        username=user.username, system=system.hostname,
        logon_type=logon_type, source_ip=source_ip or "",
    )

    # Phase 2: Build complete SecurityEvent
    event = SecurityEvent(
        timestamp=time,
        event_type="logon",
        host=self._build_host_context(system),
        auth=AuthContext(
            username=user.username,
            user_sid=self._get_sid(user),
            logon_id=logon_id,           # From StateManager
            logon_type=logon_type,
            source_ip=source_ip or "",
            # ... etc
        ),
    )

    # Phase 3: Dispatch (routes to matching emitters)
    self.dispatcher.dispatch(event)

    return logon_id

Helper method: _build_host_context(system) -- builds a HostContext from a System model object. Shared across all generate_* methods to avoid duplication.

Retiring _emit_ecar_* helpers: Each migrated generate_* method's corresponding _emit_ecar_* helper (e.g., _emit_ecar_logon()) is deleted. The eCAR rendering logic moves into EcarEmitter._render_{event_type}(). These helpers are:

  • _emit_ecar_logon() -> EcarEmitter._render_logon()
  • _emit_ecar_process() -> EcarEmitter._render_process_create()
  • _emit_ecar_file_event() -> EcarEmitter._render_file_operation()
  • _emit_ecar_registry_event() -> EcarEmitter._render_registry_operation()
  • _emit_ecar_module_event() -> EcarEmitter._render_module_load() via canonical image_load
  • _emit_ecar_flow_event() -> EcarEmitter._render_connection()

4.5 eCAR Format Improvements (Post-Migration)

With the canonical event model in place, EcarEmitter._render_event() builds JSON directly in Python (no Jinja2 template), producing spec-compliant eCAR records:

Top-level fields (all records):

  • timestamp_ms (int): Milliseconds since epoch
  • id (UUID): Unique event record ID
  • hostname (string): Target system
  • object / action (string): Entity type and operation
  • objectID (UUID): Persistent entity ID — same across lifecycle (e.g., CREATE and TERMINATE for one process)
  • actorID (UUID, optional): ID of the entity that performed the action (parent process on PROCESS/CREATE, initiating process on FILE/REGISTRY/MODULE/FLOW)
  • pid (int): Always present, -1 if unavailable
  • tid (int): Always present, -1 if unavailable
  • ppid (int): PROCESS events only
  • principal (string, optional): Username
  • properties (dict): Event-specific key-value pairs, all values are strings per eCAR spec

objectID/actorID graph: Managed via EdrContext on SecurityEvent. UUIDs are allocated by StateManager at entity creation time (create_session(), create_process()), looked up via get_session_object_id() / get_process_object_id(), and attached by ActivityGenerator to each event.

FLOW pid mapping: Baseline connections carry the PID of the realistic initiating system process (svchost for DNS/NTP, lsass for Kerberos/LDAP, System PID 4 for SMB, mstsc.exe for RDP). Distro-aware: Ubuntu uses systemd-resolved for DNS, RHEL apps resolve directly (-1). Storyline connections carry _last_storyline_pid.

5. Migration Strategy

5.1 Approach: All Activity Types, One at a Time

Migrate all ~14 generate_* methods in ActivityGenerator, one method at a time, validating each before moving to the next. Order by cross-format complexity (most emitters first):

  1. generate_logon() -- Windows + syslog + eCAR (3 formats, highest P0 bug surface)
  2. generate_connection() -- Zeek conn + Zeek DNS + eCAR FLOW + Snort (4 formats)
  3. generate_process() -- Windows + syslog + eCAR + bash_history (4 formats)
  4. generate_logoff() -- Windows + syslog + eCAR (3 formats)
  5. generate_failed_logon() -- Windows + eCAR (2 formats)
  6. generate_process_termination() -- Windows + eCAR (2 formats)
  7. generate_bash_command() -- bash_history (1 format)
  8. generate_system_process() -- Windows + eCAR (2 formats)
  9. generate_machine_account_logon() -- Windows (1 format)
  10. generate_kerberos_tgt() -- Windows (1 format)
  11. generate_kerberos_service_ticket() -- Windows (1 format)
  12. generate_ntlm_validation() -- Windows (1 format)

5.2 Per-Method Migration Steps

For each generate_* method:

  1. Refactor method body -- Keep the existing StateManager ID allocation calls at the top. Replace the per-emitter emit_event() calls with SecurityEvent construction + self.dispatcher.dispatch(event).
  2. Implement emitter render methods -- For each emitter this event type touches, implement _render_{event_type}() that builds a field dict from SecurityEvent contexts, then passes to the existing template rendering pipeline.
  3. Retire eCAR helpers -- Delete the corresponding _emit_ecar_* helper; its logic moves into EcarEmitter._render_{event_type}().
  4. Run tests -- All existing tests must pass. Run eforge evaluate on the reference scenario.
  5. Commit -- One commit per migrated method for clean git bisection.

5.3 Backward Compatibility

  • emit_raw() on emitters preserves the dict-based path for the user-facing raw event type in scenario YAML. All internal engine code uses SecurityEvent + EventDispatcher exclusively.
  • StateManager's existing methods remain unchanged; apply() is purely additive.
  • Engine orchestration (_generate_baseline(), _execute_storyline_events_in_hour(), etc.) is unchanged -- only ActivityGenerator internals change.

6. File Changes

New Files

File Purpose
src/evidenceforge/events/__init__.py Package init, re-exports SecurityEvent, RawLogEntry, all context types
src/evidenceforge/events/base.py SecurityEvent, RawLogEntry dataclasses
src/evidenceforge/events/contexts.py All *Context dataclasses (HostContext, AuthContext, etc.)
src/evidenceforge/events/dispatcher.py EventDispatcher class
tests/unit/test_events.py Unit tests for event/context construction
tests/unit/test_dispatcher.py Unit tests for dispatcher routing + visibility filtering

Modified Files

File Changes
src/evidenceforge/generation/activity.py Constructor accepts dispatcher; each generate_* method builds SecurityEvent + dispatches; _emit_ecar_* helpers retired
src/evidenceforge/generation/state_manager.py Add apply(event) method
src/evidenceforge/generation/emitters/base.py Add can_handle(), emit(), emit_raw() to base class
src/evidenceforge/generation/emitters/windows.py Add _supported_types, can_handle(), emit(), per-type render methods
src/evidenceforge/generation/emitters/zeek.py Add _supported_types, can_handle(), emit(), _render_connection()
src/evidenceforge/generation/emitters/zeek_dns.py Add _supported_types, can_handle(), emit(), _render_dns_query()
src/evidenceforge/generation/emitters/ecar.py Add _supported_types, can_handle(), emit(), per-type render methods
src/evidenceforge/generation/emitters/syslog.py Add _supported_types, can_handle(), emit(), per-type render methods
src/evidenceforge/generation/emitters/bash_history.py Add _supported_types, can_handle(), emit(), _render_bash_command()
src/evidenceforge/generation/emitters/snort.py Add _supported_types, can_handle(), emit(), _render_connection()
src/evidenceforge/generation/emitters/web.py Add _supported_types, can_handle(), emit(), _render_web_request()
src/evidenceforge/generation/engine.py Create EventDispatcher, pass to ActivityGenerator
docs/PRD.md Add reference to this document in Post-MVP section

Preserved (No Changes)

File Reason
Scenario YAML schema Internal refactor, no user-facing changes
Format definition YAML files Templates still used for final string rendering
CLI commands No interface changes
network_visibility.py Used by dispatcher via existing get_log_formats_for_connection() API
Ground truth generation Reads from StateManager, not from events

7. Testing Strategy

7.1 Validation Criteria

Primary: Generate logs from tests/fixtures/scenarios/retail-store-ftp-attack.yaml before and after migration. Run eforge evaluate OUTPUT_DIR --report before.json and eforge evaluate OUTPUT_DIR --report after.json. All scores must be equal or better. No new warnings or failures.

Secondary:

  • All existing tests pass (zero regressions)
  • New unit tests for event construction, dispatcher routing, and emitter rendering
  • Coverage maintained at current level

7.2 New Tests

Test What It Validates
test_security_event_construction SecurityEvent with various context combinations creates valid objects
test_context_no_dynamic_attributes slots=True prevents adding undeclared attributes (e.g., event.host.bogus = 1 raises AttributeError)
test_dispatcher_routes_to_correct_emitters Format self-selection works for each event type (verify correct emitter set)
test_dispatcher_applies_visibility_filtering Network events filtered using get_log_formats_for_connection() -- only visible formats receive events
test_dispatcher_host_events_skip_visibility Host-based events (logon, process) bypass visibility checks entirely
test_dispatcher_state_manager_apply apply() records sessions, ends sessions on logoff, ends processes on terminate
test_raw_log_entry_bypasses_model dispatch_raw() routes directly to named emitter via emit_raw()
test_emitter_can_handle_{emitter} Each emitter correctly accepts/rejects event types per _supported_types
test_emitter_render_{emitter}_{event_type} Each emitter produces correct output fields from SecurityEvent
test_two_phase_build_ids_match IDs allocated by StateManager appear on the SecurityEvent and in rendered output
test_migration_parity_{method} For each migrated method: rendered output is structurally equivalent to pre-migration output

7.3 Reference Scenario

Use tests/fixtures/scenarios/retail-store-ftp-attack.yaml as the primary comparison scenario -- 23 users, 24-hour window, storyline events, and full network topology with sensors.

8. Non-Functional Requirements

Performance

  • Event object construction: negligible compared to I/O cost (dataclass with slots, no validation)
  • Memory overhead: < 2 KB per transient event object, immediately GC'd after dispatch
  • Peak memory: No measurable increase (events don't persist, same buffer strategy)
  • Generation time: No measurable increase (event construction is trivial vs. I/O and template rendering)

Maintainability

  • Adding a new log format: Implement _supported_types, can_handle(), emit(), and per-type render methods on a new emitter. No changes to ActivityGenerator or other emitters.
  • Adding a new activity type: Implement generate_* method to build a SecurityEvent, add event_type to catalog, add render methods to relevant emitters. Dispatcher handles routing automatically.
  • Adding a new context type: Add @dataclass(slots=True) to contexts.py, add optional field to SecurityEvent. No changes to existing contexts or emitters (they ignore unknown contexts).

9. Execution Plan

Phase 1: Foundation

  • Create src/evidenceforge/events/ package with all dataclasses (SecurityEvent, RawLogEntry, all contexts)
  • Implement EventDispatcher with NetworkVisibilityEngine integration using existing get_log_formats_for_connection() API
  • Add apply() to StateManager
  • Add can_handle(), emit(), emit_raw() to base emitter; implement _supported_types on all 8 emitter subclasses
  • Update engine.py to create EventDispatcher and pass to ActivityGenerator
  • Update ActivityGenerator.__init__() to accept dispatcher
  • Write unit tests for events, contexts, dispatcher, and emitter can_handle() logic

Phase 2: Migrate Activity Types

  • Migrate methods one at a time in order from Section 5.1
  • For each: refactor to two-phase build + dispatch, implement emitter render methods, retire _emit_ecar_* helper, run tests
  • One commit per migrated method

Phase 3: Cleanup

  • Remove ActivityGenerator.network_visibility (now handled by dispatcher)
  • Remove ActivityGenerator.emitters convenience reference (all access through dispatcher)
  • Remove any remaining direct emitter.emit_event(dict) calls
  • Rename old emit_event() to emit_raw() if not already done
  • Update docs/PRD.md with reference to this document in Post-MVP section
  • Final eval comparison run: eforge evaluate before vs. after on reference scenario