Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Granular, per-pixel statistics method. #477

Merged
merged 4 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ score=yes
ignore-comments=yes

# Docstrings are removed from the similarity computation
ignore-docstrings=no
ignore-docstrings=yes

# Imports are removed from the similarity computation
ignore-imports=yes
Expand Down
37 changes: 36 additions & 1 deletion src/hats/catalog/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from hats.catalog.dataset.table_properties import TableProperties
from hats.io import file_io
from hats.io.parquet_metadata import aggregate_column_statistics
from hats.io.parquet_metadata import aggregate_column_statistics, per_pixel_statistics


# pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -63,3 +63,38 @@
exclude_columns=exclude_columns,
include_columns=include_columns,
)

def per_pixel_statistics(
self,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
include_stats: list[str] = None,
multiindex=False,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.

Args:
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
include_stats (List[str]): if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count). Defaults to None, and returns all values.
multiindex (bool): should the returned frame be created with a multi-index, first on
pixel, then on column name? Default is False, and instead indexes on pixel, with
separate columns per-data-column and stat value combination.
"""
if not self.on_disk:
warnings.warn("Calling per_pixel_statistics on an in-memory catalog. No results.")
return pd.DataFrame()
return per_pixel_statistics(

Check warning on line 93 in src/hats/catalog/dataset/dataset.py

View check run for this annotation

Codecov / codecov/patch

src/hats/catalog/dataset/dataset.py#L90-L93

Added lines #L90 - L93 were not covered by tests
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
include_stats=include_stats,
multiindex=multiindex,
)
43 changes: 42 additions & 1 deletion src/hats/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from hats.catalog.partition_info import PartitionInfo
from hats.inspection import plot_pixels
from hats.inspection.visualize_catalog import plot_moc
from hats.io.parquet_metadata import aggregate_column_statistics
from hats.io.parquet_metadata import aggregate_column_statistics, per_pixel_statistics
from hats.pixel_math import HealpixPixel
from hats.pixel_math.box_filter import generate_box_moc, wrap_ra_angles
from hats.pixel_math.validators import (
Expand Down Expand Up @@ -288,3 +288,44 @@ def aggregate_column_statistics(
include_columns=include_columns,
include_pixels=include_pixels,
)

def per_pixel_statistics(
self,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
include_stats: list[str] = None,
multiindex=False,
include_pixels: list[HealpixPixel] = None,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.

Args:
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
include_pixels (list[HealpixPixel]): if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
include_stats (List[str]): if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count). Defaults to None, and returns all values.
multiindex (bool): should the returned frame be created with a multi-index, first on
pixel, then on column name?
"""
if not self.on_disk:
warnings.warn("Calling per_pixel_statistics on an in-memory catalog. No results.")
return pd.DataFrame()

if include_pixels is None:
include_pixels = self.get_healpix_pixels()
return per_pixel_statistics(
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
include_stats=include_stats,
multiindex=multiindex,
include_pixels=include_pixels,
)
153 changes: 136 additions & 17 deletions src/hats/io/parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,14 @@
return max(value1, value2)


def aggregate_column_statistics(
metadata_file: str | Path | UPath,
def _pick_columns(
first_row_group,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
include_pixels: list[HealpixPixel] = None,
):
"""Read footer statistics in parquet metadata, and report on global min/max values.

Args:
metadata_file (str | Path | UPath): path to `_metadata` file
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
num_row_groups = total_metadata.num_row_groups
first_row_group = total_metadata.row_group(0)
"""Convenience method to find the desired columns and their indexes, given
some conventional user preferences."""

if include_columns is None:
include_columns = []
Expand All @@ -166,9 +154,45 @@
if (len(include_columns) == 0 or name in include_columns)
and not (len(exclude_columns) > 0 and name in exclude_columns)
]
column_names = [column_names[i] for i in good_column_indexes]

return good_column_indexes, column_names


def aggregate_column_statistics(
metadata_file: str | Path | UPath,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
include_pixels: list[HealpixPixel] = None,
):
"""Read footer statistics in parquet metadata, and report on global min/max values.

Args:
metadata_file (str | Path | UPath): path to `_metadata` file
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
include_pixels (list[HealpixPixel]): if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
Returns:
dataframe with global summary statistics
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
num_row_groups = total_metadata.num_row_groups
first_row_group = total_metadata.row_group(0)

good_column_indexes, column_names = _pick_columns(
first_row_group=first_row_group,
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
)
if not good_column_indexes:
return pd.DataFrame()
column_names = [column_names[i] for i in good_column_indexes]

extrema = None

for row_group_index in range(0, num_row_groups):
Expand Down Expand Up @@ -219,3 +243,98 @@
}
).set_index("column_names")
return frame


def per_pixel_statistics(
metadata_file: str | Path | UPath,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
include_stats: list[str] = None,
multiindex=False,
include_pixels: list[HealpixPixel] = None,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.

Args:
metadata_file (str | Path | UPath): path to `_metadata` file
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
include_pixels (list[HealpixPixel]): if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
include_stats (List[str]): if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count). Defaults to None, and returns all values.
multiindex (bool): should the returned frame be created with a multi-index, first on
pixel, then on column name? Default is False, and instead indexes on pixel, with
separate columns per-data-column and stat value combination.
Returns:
dataframe with granular per-pixel statistics
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
num_row_groups = total_metadata.num_row_groups
first_row_group = total_metadata.row_group(0)

good_column_indexes, column_names = _pick_columns(
first_row_group=first_row_group,
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
)
if not good_column_indexes:
return pd.DataFrame()

all_stats = ["min_value", "max_value", "null_count", "row_count"]

if include_stats is None or len(include_stats) == 0:
include_stats = all_stats
else:
for stat in include_stats:
if stat not in all_stats:
raise ValueError(f"include_stats must be from list {all_stats} (found {stat})")

Check warning on line 297 in src/hats/io/parquet_metadata.py

View check run for this annotation

Codecov / codecov/patch

src/hats/io/parquet_metadata.py#L297

Added line #L297 was not covered by tests

stat_mask = np.array([ind for ind, stat in enumerate(all_stats) if stat in include_stats])
pixels = []
leaf_stats = []

for row_group_index in range(0, num_row_groups):
row_group = total_metadata.row_group(row_group_index)
pixel = paths.get_healpix_from_path(row_group.column(0).file_path)
if include_pixels is not None and pixel not in include_pixels:
continue
row_stats = [
(
[None, None, 0, 0]
if row_group.column(col).statistics is None
else [
row_group.column(col).statistics.min,
row_group.column(col).statistics.max,
row_group.column(col).statistics.null_count,
row_group.column(col).num_values,
]
)
for col in good_column_indexes
]
row_stats = np.take(row_stats, stat_mask, axis=1)
pixels.append(pixel)
leaf_stats.append(row_stats)

stats_lists = np.array(leaf_stats)
original_shape = stats_lists.shape

if multiindex:
stats_lists = stats_lists.reshape((original_shape[0] * original_shape[1], original_shape[2]))
frame = pd.DataFrame(
stats_lists,
index=pd.MultiIndex.from_product([pixels, column_names], names=["pixel", "column"]),
columns=include_stats,
)
else:
stats_lists = stats_lists.reshape((original_shape[0], original_shape[1] * original_shape[2]))
mod_col_names = [[f"{col_name}: {stat}" for stat in include_stats] for col_name in column_names]
mod_col_names = np.array(mod_col_names).flatten()
frame = pd.DataFrame(stats_lists, index=pixels, columns=mod_col_names)
return frame
25 changes: 23 additions & 2 deletions tests/hats/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_load_catalog_small_sky_order1(small_sky_order1_dir):
assert len(cat.get_healpix_pixels()) == 4


def test_aggregate_column_statistics(small_sky_order1_dir):
def test_catalog_statistics(small_sky_order1_dir):
def assert_column_stat_as_floats(
result_frame, column_name, min_value=None, max_value=None, row_count=None
):
Expand Down Expand Up @@ -121,13 +121,34 @@ def assert_column_stat_as_floats(
assert len(result_frame) == 5
assert_column_stat_as_floats(result_frame, "dec", min_value=-69.5, max_value=-47.5, row_count=42)

result_frame = cat.per_pixel_statistics()
# 20 = 5 columns * 4 pixels
assert result_frame.shape == (4, 20)

def test_aggregate_column_statistics_inmemory(catalog_info, catalog_pixels):
result_frame = cat.per_pixel_statistics(exclude_hats_columns=False)
# 36 = 9 columns * 4 stats per-column
assert result_frame.shape == (4, 36)

result_frame = cat.per_pixel_statistics(
include_columns=["ra", "dec"], include_stats=["min_value", "max_value"]
)
# 4 = 2 columns * 2 stats per-column
assert result_frame.shape == (4, 4)

result_frame = filtered_catalog.per_pixel_statistics()
assert result_frame.shape == (1, 20)


def test_catalog_statistics_inmemory(catalog_info, catalog_pixels):
catalog = Catalog(catalog_info, catalog_pixels)
with pytest.warns(UserWarning, match="in-memory"):
result_frame = catalog.aggregate_column_statistics(include_columns=["ra", "dec"])
assert len(result_frame) == 0

with pytest.warns(UserWarning, match="in-memory"):
result_frame = catalog.per_pixel_statistics(include_columns=["ra", "dec"])
assert len(result_frame) == 0


def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir):
"""Instantiate a catalog with 4 pixels"""
Expand Down
53 changes: 52 additions & 1 deletion tests/hats/io/test_parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pyarrow.parquet as pq

from hats.io import file_io, paths
from hats.io.parquet_metadata import aggregate_column_statistics, write_parquet_metadata
from hats.io.parquet_metadata import aggregate_column_statistics, per_pixel_statistics, write_parquet_metadata
from hats.pixel_math.healpix_pixel import HealpixPixel


Expand Down Expand Up @@ -232,3 +232,54 @@ def test_aggregate_column_statistics_with_nulls(tmp_path):

assert_column_stat_as_floats(result_frame, "data", min_value=-1, max_value=2, null_count=4, row_count=6)
assert_column_stat_as_floats(result_frame, "Npix", min_value=1, max_value=6, null_count=4, row_count=6)


def test_per_pixel_statistics(small_sky_order1_dir):
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)

result_frame = per_pixel_statistics(partition_info_file)
# 20 = 5 columns * 4 stats per-column
assert result_frame.shape == (4, 20)

result_frame = per_pixel_statistics(partition_info_file, exclude_hats_columns=False)
# 36 = 9 columns * 4 stats per-column
assert result_frame.shape == (4, 36)

result_frame = per_pixel_statistics(partition_info_file, include_columns=["ra", "dec"])
# 8 = 2 columns * 4 stats per-column
assert result_frame.shape == (4, 8)

result_frame = per_pixel_statistics(partition_info_file, include_columns=["does", "not", "exist"])
assert len(result_frame) == 0


def test_per_pixel_statistics_multiindex(small_sky_order1_dir):
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)

result_frame = per_pixel_statistics(partition_info_file, multiindex=True)
# 20 = 5 columns * 4 pixels
assert result_frame.shape == (20, 4)

result_frame = per_pixel_statistics(partition_info_file, exclude_hats_columns=False, multiindex=True)
# 36 = 9 columns * 4 stats per-column
assert result_frame.shape == (36, 4)


def test_per_pixel_statistics_include_stats(small_sky_order1_dir):
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)

result_frame = per_pixel_statistics(partition_info_file, include_stats=["row_count"])
# 5 = 5 columns * 1 stat per column
assert result_frame.shape == (4, 5)

result_frame = per_pixel_statistics(
partition_info_file, include_stats=["row_count"], include_columns=["id"]
)
# 1 = 1 columns * 1 stat per column
assert result_frame.shape == (4, 1)

result_frame = per_pixel_statistics(
partition_info_file, include_stats=["row_count"], include_columns=["id"], multiindex=True
)
# 1 = 1 columns * 1 stat per column
assert result_frame.shape == (4, 1)