Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,14 @@
"sac": sac,
"neo4j": {"pandas", "neo4j"},
"vertexai": {"google-cloud-aiplatform>=1.80.0"},
# Debug/utility plugins
"debug-recording": {
# VCR.py for HTTP recording/replay - industry standard
# vcrpy 7.x manages urllib3 compatibility automatically based on Python version
"vcrpy>=7.0.0",
# AES-256 encrypted zip files
"pyzipper>=0.3.6",
},
}

# This is mainly used to exclude plugins from the Docker image.
Expand All @@ -635,6 +643,8 @@
# Feast tends to have overly restrictive dependencies and hence doesn't
# play nice with the "all" installation.
"feast",
# Debug recording is an optional debugging tool.
"debug-recording",
}

mypy_stubs = {
Expand Down
296 changes: 285 additions & 11 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import sys
import textwrap
from datetime import datetime
from typing import Optional
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from datahub.ingestion.recording.recorder import IngestionRecorder

import click
import click_spinner
Expand Down Expand Up @@ -103,6 +106,36 @@ def ingest() -> None:
default=False,
help="If enabled, mute intermediate progress ingestion reports",
)
@click.option(
"--record",
type=bool,
is_flag=True,
default=False,
help="Enable recording of ingestion run for debugging. "
"Requires recording config in recipe or --record-password.",
)
@click.option(
"--record-password",
type=str,
default=None,
help="Password for encrypting the recording archive. "
"Can also be set via DATAHUB_RECORDING_PASSWORD env var.",
)
@click.option(
"--no-s3-upload",
type=bool,
is_flag=True,
default=False,
help="Disable S3 upload of recording (for testing).",
)
@click.option(
"--no-secret-redaction",
type=bool,
is_flag=True,
default=False,
help="Disable secret redaction in recordings (for local debugging). "
"WARNING: Recording will contain actual secrets. Use with caution.",
)
@telemetry.with_telemetry(
capture_kwargs=[
"dry_run",
Expand All @@ -112,6 +145,7 @@ def ingest() -> None:
"no_default_report",
"no_spinner",
"no_progress",
"record",
]
)
@upgrade.check_upgrade
Expand All @@ -126,6 +160,10 @@ def run(
no_default_report: bool,
no_spinner: bool,
no_progress: bool,
record: bool,
record_password: Optional[str],
no_s3_upload: bool,
no_secret_redaction: bool,
) -> None:
"""Ingest metadata into DataHub."""

Expand Down Expand Up @@ -172,17 +210,37 @@ def run_pipeline_to_completion(pipeline: Pipeline) -> int:
# The default is "datahub" reporting. The extra flag will disable it.
report_to = None

# logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(
pipeline_config,
dry_run=dry_run,
preview_mode=preview,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
raw_config=raw_pipeline_config,
# Helper function to create and run the pipeline
def create_and_run_pipeline() -> int:
pipeline = Pipeline.create(
pipeline_config,
dry_run=dry_run,
preview_mode=preview,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
raw_config=raw_pipeline_config,
)
return run_pipeline_to_completion(pipeline)

# Handle recording if enabled (via --record flag or recording.enabled in recipe)
# IMPORTANT: Pipeline.create() must happen INSIDE the recording context
# so that SDK initialization (including auth) is captured by VCR.
recording_enabled = record or pipeline_config.get("recording", {}).get(
"enabled", False
)
ret = run_pipeline_to_completion(pipeline)
if recording_enabled:
recorder = _setup_recording(
pipeline_config,
record_password,
no_s3_upload,
no_secret_redaction,
raw_pipeline_config,
)
with recorder:
ret = create_and_run_pipeline()
else:
ret = create_and_run_pipeline()

if ret:
sys.exit(ret)
Expand Down Expand Up @@ -309,6 +367,92 @@ def deploy(
click.echo(response)


def _setup_recording(
pipeline_config: dict,
record_password: Optional[str],
no_s3_upload: bool,
no_secret_redaction: bool,
raw_config: dict,
) -> "IngestionRecorder":
"""Setup recording for the ingestion run."""
from datahub.ingestion.recording.config import (
RecordingConfig,
check_recording_dependencies,
get_recording_password_from_env,
)
from datahub.ingestion.recording.recorder import IngestionRecorder

# Check dependencies first
check_recording_dependencies()

# Build recording config from recipe, with CLI overrides
recording_config_dict = pipeline_config.get("recording", {}).copy()

# CLI password takes precedence, then env var, then recipe
password = record_password or get_recording_password_from_env()
if password:
recording_config_dict["password"] = password

# CLI --no-s3-upload flag overrides recipe
if no_s3_upload:
recording_config_dict["s3_upload"] = False

# Ensure enabled is set (we're here because recording should be enabled)
recording_config_dict["enabled"] = True

# Validate config using pydantic model
try:
recording_config = RecordingConfig.model_validate(recording_config_dict)
except Exception as e:
click.secho(f"Error in recording configuration: {e}", fg="red", err=True)
sys.exit(1)

# Get password as string for recorder
if not recording_config.password:
click.secho(
"Error: Recording password required. Provide via --record-password, "
"DATAHUB_RECORDING_PASSWORD env var, or recipe recording.password.",
fg="red",
err=True,
)
sys.exit(1)
password_str = recording_config.password.get_secret_value()

# Get run_id from pipeline config or generate one
run_id = pipeline_config.get("run_id")
if not run_id:
from datahub.ingestion.run.pipeline_config import _generate_run_id

run_id = _generate_run_id(
pipeline_config.get("source", {}).get("type", "unknown")
)

# Get source and sink types for metadata
source_type = pipeline_config.get("source", {}).get("type")
sink_type = pipeline_config.get("sink", {}).get("type", "datahub-rest")

logger.info(f"Recording enabled for run_id: {run_id}")
logger.info(f"S3 upload: {'enabled' if recording_config.s3_upload else 'disabled'}")
if recording_config.output_path:
logger.info(f"Output path: {recording_config.output_path}")
if no_secret_redaction:
logger.warning(
"Secret redaction is DISABLED - recording will contain actual secrets. "
"Use this only for local debugging and NEVER commit recordings to source control."
)

return IngestionRecorder(
run_id=run_id,
password=password_str,
redact_secrets=not no_secret_redaction,
recipe=raw_config,
output_path=recording_config.output_path,
s3_upload=recording_config.s3_upload,
source_type=source_type,
sink_type=sink_type,
)


def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> int:
connection_report = None
try:
Expand Down Expand Up @@ -676,3 +820,133 @@ def rollback(
except OSError as e:
logger.exception(f"Unable to save rollback failure report: {e}")
sys.exit(f"Unable to write reports to {report_dir}")


@ingest.command()
@click.argument("archive_path", type=str)
@click.option(
"--password",
type=str,
required=False,
help="Password for decrypting the recording archive. "
"Can also be set via DATAHUB_RECORDING_PASSWORD env var.",
)
@click.option(
"--live-sink",
type=bool,
is_flag=True,
default=False,
help="If enabled, emit to real GMS server instead of using recorded responses.",
)
@click.option(
"--server",
type=str,
default=None,
help="GMS server URL when using --live-sink. Defaults to value in recorded recipe.",
)
@click.option(
"--report-to",
type=str,
default=None,
help="Path to write the report file.",
)
@click.option(
"--no-spinner",
type=bool,
is_flag=True,
default=False,
help="Turn off spinner",
)
@telemetry.with_telemetry(capture_kwargs=["live_sink"])
@upgrade.check_upgrade
def replay(
archive_path: str,
password: Optional[str],
live_sink: bool,
server: Optional[str],
report_to: Optional[str],
no_spinner: bool,
) -> None:
"""
Replay a recorded ingestion run for debugging.

ARCHIVE_PATH can be a local file path or an S3 URL (s3://bucket/path/to/recording.zip).

This command runs an ingestion using recorded HTTP and database responses,
allowing offline debugging without network access.

Examples:
# Replay from local file
datahub ingest replay ./recording.zip --password secret

# Replay from S3
datahub ingest replay s3://bucket/recordings/run-id.zip --password secret

# Replay with live sink (emit to real GMS)
datahub ingest replay ./recording.zip --password secret --live-sink
"""
from datahub.ingestion.recording.config import (
check_recording_dependencies,
get_recording_password_from_env,
)
from datahub.ingestion.recording.replay import IngestionReplayer

# Check dependencies
try:
check_recording_dependencies()
except ImportError as e:
click.secho(str(e), fg="red", err=True)
sys.exit(1)

# Get password
password = password or get_recording_password_from_env()
if not password:
click.secho(
"Error: Password required. Provide via --password or "
"DATAHUB_RECORDING_PASSWORD env var.",
fg="red",
err=True,
)
sys.exit(1)

logger.info("DataHub CLI version: %s", nice_version_name())
logger.info(f"Replaying recording from: {archive_path}")
logger.info(f"Mode: {'live sink' if live_sink else 'air-gapped'}")

with IngestionReplayer(
archive_path=archive_path,
password=password,
live_sink=live_sink,
gms_server=server,
) as replayer:
recipe = replayer.get_recipe()
logger.info(f"Loaded recording: run_id={replayer.run_id}")

# Create and run pipeline
pipeline = Pipeline.create(recipe)

logger.info("Starting replay...")
with click_spinner.spinner(disable=no_spinner):
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.source_type}) report:\n"
f"{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.sink_type}) report:\n"
f"{pipeline.sink.get_report().as_string()}"
)
raise e
else:
logger.info("Replay complete")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary()

if report_to:
with open(report_to, "w") as f:
Copy link

@aikido-pr-checks aikido-pr-checks bot Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential file inclusion attack via reading file - low severity
If an attacker can control the input leading into the open function, they might be able to read sensitive files and launch further attacks with that information.

Remediation: Ignore this issue only after you've verified or sanitized the input going into this function.
View details in Aikido Security

f.write(pipeline.source.get_report().as_string())

if ret:
sys.exit(ret)
Loading
Loading