-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(recording): introduce ingestion recording and replay functionality #15480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Added support for recording HTTP requests and database queries during ingestion runs. - Implemented IngestionRecorder and IngestionReplayer classes for managing recordings and replays. - Introduced command-line interface for recording management, including options for password protection and S3 uploads. - Added tests for recording and replay functionalities, ensuring proper handling of secrets and query recordings. - Updated documentation to include usage examples and installation instructions for the new debug-recording plugin.
… (Option 4: SQL-only) Implements SQL-only recording strategy for database sources with vendored/non-standard HTTP libraries (Snowflake, Databricks). Key features: - Database connection patching with VCR interference recovery - SQL query recording via DB-API cursor proxies - Datetime serialization for JSON storage - DictCursor detection and handling - --no-secret-redaction flag for local debugging - Test RSA key generation for replay validation Critical bug fixes: - Fixed cursor iteration exhaustion after fetchall() - Fixed DictCursor double-conversion issue - Added datetime serialization/deserialization - Excluded non-secret auth fields from redaction Validation: - Snowflake: 40 MCPs recorded and replayed identically - 23 SQL queries captured and replayed - Perfect semantic match (metadata-diff exit code 0) - 535x faster replay (0.17s vs 91s) Benefits Snowflake, Databricks, and all DB-API 2.0 database sources.
…hub into debug/record-replay
…nd replay - Introduced CaseInsensitiveRow for case-insensitive key access in recorded results. - Updated CursorProxy to normalize row keys for consistent access across different database dialects. - Implemented mock query results for SQLAlchemy initialization queries to improve replay accuracy. - Enhanced ModulePatcher to handle SQLAlchemy connections and event listeners more effectively, ensuring proper recording and replay of queries. - Added detailed logging for better debugging and tracking of recorded queries. This update improves the robustness of the ingestion recording and replay functionality, particularly for SQLAlchemy-based interactions.
Implements SQL-only recording strategy for database sources with vendored/ non-standard HTTP libraries (Snowflake, Databricks). Core features: - Database connection patching with VCR interference recovery - SQL query recording via DB-API cursor proxies - Datetime serialization for JSON storage - DictCursor detection and handling - --no-secret-redaction flag for local debugging - Test RSA key generation for replay validation Manifest enhancements: - Added datahub_cli_version field (CLI version tracking) - Added python_version field (Python runtime version) - Added INGESTION_ARTIFACT_DIR env var support Critical bug fixes: - Fixed cursor iteration exhaustion after fetchall() - Fixed DictCursor double-conversion issue - Added datetime serialization/deserialization - Excluded non-secret auth fields from redaction Validation: - Snowflake: 40 MCPs recorded and replayed identically - 23 SQL queries captured and replayed - Perfect semantic match (metadata-diff exit code 0) - 1000x faster replay (0.1s vs 100s) Benefits Snowflake, Databricks, and all DB-API 2.0 database sources.
| bucket = parsed.netloc | ||
| key = parsed.path.lstrip("/") | ||
|
|
||
| local_path = Path(tempfile.mktemp(suffix=".zip")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of known insecure function to create temp files - high severity
You're using a deprecated Python API tempfile.mktemp that does not guarantee atomicity during the creation and opening of this file. This can lead to unintended access of the file by different processes.
Show Remediation
Remediation - medium confidence
This patch mitigates the use of an insecure temporary file creation function in '_ensure_local_archive' by replacing 'tempfile.mktemp' with 'tempfile.NamedTemporaryFile' for safer temporary file handling.
| local_path = Path(tempfile.mktemp(suffix=".zip")) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmpfile: | |
| local_path = Path(tmpfile.name) |
| def compute_checksum(file_path: Path) -> str: | ||
| """Compute SHA-256 checksum of a file.""" | ||
| sha256 = hashlib.sha256() | ||
| with open(file_path, "rb") as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential file inclusion attack via reading file - medium severity
If an attacker can control the input leading into the open function, they might be able to read sensitive files and launch further attacks with that information.
Show Remediation
Remediation - high confidence
This patch mitigates potential file inclusion attacks by implementing path traversal checks on the parameters.
| with open(file_path, "rb") as f: | |
| if "../" in str(file_path) or "..\\" in str(file_path): | |
| raise Exception("Invalid file path") | |
| with open(file_path, "rb") as f: |
| TEST_RSA_PRIVATE_KEY = """-----BEGIN PRIVATE KEY----- | ||
| MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDHjpPXINuhTHs+ | ||
| M+RS/mDapEmrr7SUiSdB4uzc1EfOSJiV44JzIzVxNIb6UCGilNvdC+xYoDTbkEUX | ||
| XPrdqMJFRcgeyd4AiynJzKFtkiJNnoOaa4FOCvFvmKOQWegrdkNOyTetdV+54vz/ | ||
| LU33SZYWJKGPzQE9U4vioQy1Lsql9jXB3n83CIvo9jvR6oyS7e0v32OWRDnrjlzP | ||
| zOGjQz2VRVo8pCiw6HkPYj4A8wbTLuuKiowY2dJjs5eLwndOI1qc+iv87ksrwYjJ | ||
| kZiWBenhHsbh45v86QLUqlI5accPHvBb3fy1dininxmhRN1Z9lEhdBCGH0rj5FiY | ||
| Lik8/p5RAgMBAAECggEABGW0xgcqM8sTxaZ+0+HZWEQJwAvggKvnhq0Fj2Wpmebx | ||
| Xs8rORaH51FTHpmsrhCV6jBosjjAhWyPyzCgMgl1k3Fy1BPabZxjbLgSwA957Egv | ||
| ifPvvtygnpcIVqZWhnumBsrKDGtTU00oSkaxKr9vPFhtC3ZG3lc0lEc8eJMaAc9p | ||
| tLImxv17qU2jGFTJbF7Por65M10YbArQOdXdk5vsMbJyAPx+AQTlJyFvZ/d/bTyR | ||
| Js7zwjP75L86p92vUOn+5b+Zl+OkuJTluSEIuxSsVHLKJP8B/HPCM7cmXUnSeFcS | ||
| IRLrhOi7f1CP9iHsH/M5/Mfbh4VTQVDdprnWVYcrwQKBgQD44j8rvChj0d92OI9F | ||
| ll17mjRw/yBqKYdgroayIuHXEovd2c1Zj6DAqlfRltEooBLmzaB5MW7m25bJpA/R | ||
| M9Z4LfUi/cqF2l1v0B4180pXjgVVyzKSVlMV2GWwwHIqc8vkEe9yqjEql9jlGcU/ | ||
| 97FyPwXf/ZL8jxUS+URkGGoisQKBgQDNQ0X8nV/8V6ak/Ku8IlkAXZvjqYBXFELP | ||
| bnsuxlX1A7VDI9eszdkjyyefSUm0w/wE2UJXqD6QMlvL9d0HRgIy2UshEB2c/lGs | ||
| hlteLv4QTDWAGx5T6WrYDM14EZuhGS3ITWE9EpqRmRKami0/2iyYgc0saUkjYoEl | ||
| YrtnQgzdoQKBgH4WkQ5lKsk3YFCSYvNMNFwUSZEdj5x5IZ63jIHe7i95s+ZXG5PO | ||
| EhDJu+fw0lIUlr7bWftMMfU/Nms9dM31xyfnkJODpACgGko1U7jdYsJsrwNCCILe | ||
| vQUKNqqPNMeRFrCa7YZX9sSvXTDkF2xK3lkU2LMb0kWlb3XHVwCm5c5hAoGBAL1z | ||
| Af2OIzF8lMpiiv8xlIPJ4j/WCiZVBPT/O6KIXH2v1nUJd95+f5ORxhg2RFkbKlgv | ||
| ThQprNTaJe+yFTbJXu4fsD/r5+kmsatStrHPHZ9dN2Pto6g/H+YYquvPFJ0z6BWf | ||
| lcgQi6kmZw1aj7kHXXHFG+GJq3+FQz2GSwGa7NUBAoGAW8qpBFtG8ExEug7kTDNF | ||
| 4Lgdyb2kyGtq8OGLgPctVhDGAv8zJeb3GbEtZbjhBEXzQ/kyCkVZEXpWGHwv1wrP | ||
| hxU8kG/Q3sbr9FMMD0iLakcoWOus3T1NY7GOlTo6hiAlkpJufU7jOLZgaci8+koQ | ||
| gu1Yi3GEOFR5fhCw7xjuO+E= | ||
| -----END PRIVATE KEY----- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposed secret in metadata-ingestion/src/datahub/ingestion/recording/archive.py - critical severity
Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.
View details in Aikido Security
❌ 2 Tests Failed:
View the top 2 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
…ation validation - Introduced `Path` handling for the output path in the recording configuration, allowing for better path management. - Updated validation logic to clarify that `output_path` is optional even when `s3_upload` is disabled, with a fallback to the `INGESTION_ARTIFACT_DIR` environment variable or a temporary directory. - Added `recording` configuration to the `PipelineConfig` for improved debugging during ingestion runs.
…oad logic - Updated output path handling in the recording configuration to accept both local paths and S3 URLs, improving flexibility. - Enhanced validation to ensure that when S3 upload is enabled, the output path must be a valid S3 URL. - Refined logic in the IngestionRecorder to clarify the conditions for using temporary files versus specified output paths. - Removed unused S3 prefix constant to streamline the codebase.
| # S3 upload uses temp file that gets uploaded then cleaned up | ||
| if self.s3_upload: | ||
| # S3 upload - create temp archive, upload to output_path (S3 URL) | ||
| archive_path = Path(tempfile.mktemp(suffix=".zip")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of known insecure function to create temp files - high severity
You're using a deprecated Python API tempfile.mktemp that does not guarantee atomicity during the creation and opening of this file. This can lead to unintended access of the file by different processes.
Show Remediation
Remediation - medium confidence
This patch mitigates the use of an insecure temporary file creation function in the '_create_archive' method of the 'recorder' module by replacing 'tempfile.mktemp()' with a more secure 'tempfile.NamedTemporaryFile()' approach.
| archive_path = Path(tempfile.mktemp(suffix=".zip")) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmpfile: | |
| archive_path = Path(tmpfile.name) |
…idation - Enhanced the logic for enabling recording by consolidating the handling of the recording flag and configuration settings. - Updated the setup process to build a comprehensive recording configuration, ensuring that CLI overrides are respected. - Improved error handling for recording configuration validation, providing clearer feedback on misconfigurations. - Streamlined the retrieval of the recording password and S3 upload settings, ensuring defaults are correctly applied.
| ret = pipeline.pretty_print_summary() | ||
|
|
||
| if report_to: | ||
| with open(report_to, "w") as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential file inclusion attack via reading file - low severity
If an attacker can control the input leading into the open function, they might be able to read sensitive files and launch further attacks with that information.
Remediation: Ignore this issue only after you've verified or sanitized the input going into this function.
View details in Aikido Security
…nk modes - Implemented logic to replace the sink configuration with a file sink in air-gapped mode, preventing network connections. - Added logging to inform users about the sink replacement and output file location. - Disabled stateful ingestion in air-gapped mode due to lack of GMS connection. - Introduced server override capability for live sink mode, allowing users to specify a GMS server.
| bucket = parsed.netloc | ||
| key = parsed.path.lstrip("/") | ||
|
|
||
| local_path = Path(tempfile.mktemp(suffix=".zip")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of known insecure function to create temp files - high severity
You're using a deprecated Python API tempfile.mktemp that does not guarantee atomicity during the creation and opening of this file. This can lead to unintended access of the file by different processes.
Show Remediation
Remediation - medium confidence
This patch mitigates the use of an insecure temporary file creation function in the '_download_from_s3' method of the replay module by replacing 'tempfile.mktemp' with 'tempfile.NamedTemporaryFile' for secure temporary file handling.
| local_path = Path(tempfile.mktemp(suffix=".zip")) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmpfile: | |
| local_path = Path(tmpfile.name) |
- Introduced regex patterns for normalizing dynamic values in SQL queries, improving the ability to match queries with varying timestamps and date formats. - Added functions to compute query similarity and generate normalized keys for fuzzy matching, enhancing the robustness of query recording and replay. - Updated the QueryRecorder class to support multi-level matching strategies: exact, normalized, and fuzzy, allowing for more flexible query retrieval. - Improved logging to provide better insights during query matching processes.
No description provided.