diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 93142a347ca0e6..9c251c040bed13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -115,26 +115,30 @@ class GEProfilingConfig(GEProfilingBaseConfig): ) max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field( default=None, - description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.", + description="A positive integer that specifies the maximum number of columns to profile for " + "any table. `None` implies all columns. The cost of profiling goes up significantly as the " + "number of columns to profile goes up.", ) profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field( default=None, - description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.", + description="Profile table only if it has been updated since these many number of days. " + "If set to `null`, no constraint of last modified time for tables to profile. " + "Supported only in `snowflake` and `BigQuery`.", ) profile_table_size_limit: Optional[int] = Field( default=5, description="Profile tables only if their size is less than specified GBs. If set to `null`, " - "no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`" - "Supported for `oracle` based on calculated size from gathered stats.", + "no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and " + "`Databricks`. Supported for `Oracle` based on calculated size from gathered stats.", ) profile_table_row_limit: Optional[int] = Field( default=5000000, - description="Profile tables only if their row count is less than specified count. If set to `null`, " - "no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`" - "Supported for `oracle` based on gathered stats.", + description="Profile tables only if their row count is less than specified count. " + "If set to `null`, no limit on the row count of tables to profile. Supported only in " + "`Snowflake`, `BigQuery`. Supported for `Oracle` based on gathered stats.", ) profile_table_row_count_estimate_only: bool = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py index e24ca8330777ed..276990ba1bbad4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py @@ -117,6 +117,7 @@ def get_unity_profile_request( if table.size_in_bytes is None: self.report.num_profile_missing_size_in_bytes += 1 + if not self.is_dataset_eligible_for_profiling( dataset_name, size_in_bytes=table.size_in_bytes, @@ -143,6 +144,17 @@ def get_unity_profile_request( self.report.report_dropped(dataset_name) return None + if profile_table_level_only and table.size_in_bytes is not None: + # For requests with profile_table_level_only set, dataset profile is generated + # by looking at table.rows_count. For delta tables (a typical databricks table) + # count(*) is an efficient query to compute row count. + # Presence of size_in_bytes confirms this is DELTA table and that we have + # SELECT permission on this table. + try: + table.rows_count = _get_dataset_row_count(table, conn) + except Exception as e: + logger.warning(f"Failed to get table row count for {dataset_name}: {e}") + self.report.report_entity_profiled(dataset_name) logger.debug(f"Preparing profiling request for {dataset_name}") return TableProfilerRequest( @@ -160,6 +172,9 @@ def _get_dataset_size_in_bytes( conn.dialect.identifier_preparer.quote(c) for c in [table.ref.catalog, table.ref.schema, table.ref.table] ) + # This query only works for delta table. + # Ref: https://docs.databricks.com/en/delta/table-details.html + # Note: Any change here should also update _get_dataset_row_count row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone() if row is None: return None @@ -168,3 +183,21 @@ def _get_dataset_size_in_bytes( return int(row._asdict()["sizeInBytes"]) except Exception: return None + + +def _get_dataset_row_count( + table: UnityCatalogSQLGenericTable, conn: Connection +) -> Optional[int]: + name = ".".join( + conn.dialect.identifier_preparer.quote(c) + for c in [table.ref.catalog, table.ref.schema, table.ref.table] + ) + # This query only works efficiently for delta table + row = conn.execute(f"select count(*) as numRows from {name}").fetchone() + if row is None: + return None + else: + try: + return int(row._asdict()["numRows"]) + except Exception: + return None