Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/unity): add row count in table profile of delta tables #12480

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,30 @@ class GEProfilingConfig(GEProfilingBaseConfig):
)
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field(
default=None,
description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
description="A positive integer that specifies the maximum number of columns to profile for "
"any table. `None` implies all columns. The cost of profiling goes up significantly as the "
"number of columns to profile goes up.",
)

profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=None,
description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
description="Profile table only if it has been updated since these many number of days. "
"If set to `null`, no constraint of last modified time for tables to profile. "
"Supported only in `snowflake` and `BigQuery`.",
)

profile_table_size_limit: Optional[int] = Field(
default=5,
description="Profile tables only if their size is less than specified GBs. If set to `null`, "
"no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`"
"Supported for `oracle` based on calculated size from gathered stats.",
"no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and "
"`Databricks`. Supported for `oracle` based on calculated size from gathered stats.",
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
)

profile_table_row_limit: Optional[int] = Field(
default=5000000,
description="Profile tables only if their row count is less than specified count. If set to `null`, "
"no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`"
"Supported for `oracle` based on gathered stats.",
description="Profile tables only if their row count is less than specified count. "
"If set to `null`, no limit on the row count of tables to profile. Supported only in "
"`Snowflake`, `BigQuery`. Supported for `oracle` based on gathered stats.",
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
)

profile_table_row_count_estimate_only: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@

if table.size_in_bytes is None:
self.report.num_profile_missing_size_in_bytes += 1

if not self.is_dataset_eligible_for_profiling(
dataset_name,
size_in_bytes=table.size_in_bytes,
Expand All @@ -143,6 +144,17 @@
self.report.report_dropped(dataset_name)
return None

if profile_table_level_only and table.size_in_bytes is not None:

Check warning on line 147 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L147

Added line #L147 was not covered by tests
# For requests with profile_table_level_only set, dataset profile is generated
# by looking at table.rows_count. For delta tables (a typical databricks table)
# count(*) is an efficient query to compute row count.
# Presence of size_in_bytes confirms this is DELTA table and that we have
# SELECT permission on this table.
Comment on lines +151 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we may transform this comment in actual code. Something like:

is_delta_table = table.size_in_bytes is not None

and then use the new flag accordingly to call delta-specific code

WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we can add is_delta_table boolean in UnityCatalogSQLGenericTable based on custom property data_source_format in original table.
Or we can add is_delta_table_with_select_enabled: table.size_in_bytes is not None as suggested. Maybe I can do both.

try:
table.rows_count = _get_dataset_row_count(table, conn)
except Exception as e:
logger.warning(f"Failed to get table row count for {dataset_name}: {e}")

Check warning on line 156 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L153-L156

Added lines #L153 - L156 were not covered by tests

self.report.report_entity_profiled(dataset_name)
logger.debug(f"Preparing profiling request for {dataset_name}")
return TableProfilerRequest(
Expand All @@ -160,6 +172,9 @@
conn.dialect.identifier_preparer.quote(c)
for c in [table.ref.catalog, table.ref.schema, table.ref.table]
)
# This query only works for delta table.
# Ref: https://docs.databricks.com/en/delta/table-details.html
# Note: Any change here should also update _get_dataset_row_count
row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone()
if row is None:
return None
Expand All @@ -168,3 +183,21 @@
return int(row._asdict()["sizeInBytes"])
except Exception:
return None


def _get_dataset_row_count(
table: UnityCatalogSQLGenericTable, conn: Connection
) -> Optional[int]:
name = ".".join(

Check warning on line 191 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L191

Added line #L191 was not covered by tests
conn.dialect.identifier_preparer.quote(c)
for c in [table.ref.catalog, table.ref.schema, table.ref.table]
)
# This query only works efficiently for delta table
row = conn.execute(f"select count(*) as numRows from {name}").fetchone()
if row is None:
return None

Check warning on line 198 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L196-L198

Added lines #L196 - L198 were not covered by tests
else:
try:
return int(row._asdict()["numRows"])
except Exception:
return None

Check warning on line 203 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L200-L203

Added lines #L200 - L203 were not covered by tests
Loading