Skip to content

Commit 652a1b5

Browse files
committed
Remove reading partition info pixels from Norder/Npix
1 parent 86610eb commit 652a1b5

File tree

6 files changed

+22
-239
lines changed

6 files changed

+22
-239
lines changed

src/hats/catalog/partition_info.py

+15-48
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111

1212
import hats.pixel_math.healpix_shim as hp
1313
from hats.io import file_io, paths
14-
from hats.io.parquet_metadata import read_row_group_fragments, row_group_stat_single_value
15-
from hats.pixel_math import HealpixPixel
14+
from hats.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel
1615

1716

1817
class PartitionInfo:
@@ -102,71 +101,39 @@ def read_from_dir(cls, catalog_base_dir: str | Path | UPath | None) -> Partition
102101
return cls(pixel_list, catalog_base_dir)
103102

104103
@classmethod
105-
def read_from_file(cls, metadata_file: str | Path | UPath, strict: bool = False) -> PartitionInfo:
104+
def read_from_file(cls, metadata_file: str | Path | UPath) -> PartitionInfo:
106105
"""Read partition info from a `_metadata` file to create an object
107106
108107
Args:
109108
metadata_file (UPath): path to the `_metadata` file
110-
strict (bool): use strict parsing of _metadata file. this is slower, but
111-
gives more helpful error messages in the case of invalid data.
112109
113110
Returns:
114111
A `PartitionInfo` object with the data from the file
115112
"""
116-
return cls(cls._read_from_metadata_file(metadata_file, strict))
113+
return cls(cls._read_from_metadata_file(metadata_file))
117114

118115
@classmethod
119-
def _read_from_metadata_file(
120-
cls, metadata_file: str | Path | UPath, strict: bool = False
121-
) -> list[HealpixPixel]:
116+
def _read_from_metadata_file(cls, metadata_file: str | Path | UPath) -> list[HealpixPixel]:
122117
"""Read partition info list from a `_metadata` file.
123118
124119
Args:
125120
metadata_file (UPath): path to the `_metadata` file
126-
strict (bool): use strict parsing of _metadata file. this is slower, but
127-
gives more helpful error messages in the case of invalid data.
128121
129122
Returns:
130123
A `PartitionInfo` object with the data from the file
131124
"""
132-
if strict:
133-
pixel_list = [
134-
HealpixPixel(
135-
row_group_stat_single_value(row_group, cls.METADATA_ORDER_COLUMN_NAME),
136-
row_group_stat_single_value(row_group, cls.METADATA_PIXEL_COLUMN_NAME),
137-
)
138-
for row_group in read_row_group_fragments(metadata_file)
139-
]
140-
else:
141-
total_metadata = file_io.read_parquet_metadata(metadata_file)
142-
num_row_groups = total_metadata.num_row_groups
143-
144-
first_row_group = total_metadata.row_group(0)
145-
norder_column = -1
146-
npix_column = -1
147-
148-
for i in range(0, first_row_group.num_columns):
149-
column = first_row_group.column(i)
150-
if column.path_in_schema == cls.METADATA_ORDER_COLUMN_NAME:
151-
norder_column = i
152-
elif column.path_in_schema == cls.METADATA_PIXEL_COLUMN_NAME:
153-
npix_column = i
154-
155-
if norder_column == -1 or npix_column == -1:
156-
raise ValueError("Metadata missing Norder or Npix column")
157-
158-
row_group_index = np.arange(0, num_row_groups)
159-
160-
pixel_list = [
161-
HealpixPixel(
162-
total_metadata.row_group(index).column(norder_column).statistics.min,
163-
total_metadata.row_group(index).column(npix_column).statistics.min,
164-
)
165-
for index in row_group_index
166-
]
125+
total_metadata = file_io.read_parquet_metadata(metadata_file)
126+
if total_metadata.num_row_groups == 0 or total_metadata.row_group(0).num_columns == 0:
127+
raise ValueError(f"Insufficient metadata in file {metadata_file}")
128+
129+
pixel_list = [
130+
paths.get_healpix_from_path(total_metadata.row_group(index).column(0).file_path)
131+
for index in range(0, total_metadata.num_row_groups)
132+
]
133+
pixel_list = [p for p in pixel_list if p != INVALID_PIXEL]
134+
if len(pixel_list) == 0:
135+
raise ValueError(f"Insufficient metadata in file {metadata_file}")
167136
## Remove duplicates, preserving order.
168-
## In the case of association partition join info, we may have multiple entries
169-
## for the primary order/pixels.
170137
return list(dict.fromkeys(pixel_list))
171138

172139
@classmethod

src/hats/io/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Utilities for reading and writing catalog files"""
22

3-
from .parquet_metadata import read_row_group_fragments, row_group_stat_single_value, write_parquet_metadata
3+
from .parquet_metadata import write_parquet_metadata
44
from .paths import (
55
get_common_metadata_pointer,
66
get_parquet_metadata_pointer,

src/hats/io/parquet_metadata.py

-62
Original file line numberDiff line numberDiff line change
@@ -7,73 +7,13 @@
77
import numpy as np
88
import pandas as pd
99
import pyarrow.dataset as pds
10-
import pyarrow.parquet as pq
1110
from upath import UPath
1211

1312
from hats.io import file_io, paths
1413
from hats.io.file_io.file_pointer import get_upath
15-
from hats.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel
1614
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
1715

1816

19-
def row_group_stat_single_value(row_group, stat_key: str):
20-
"""Convenience method to find the min and max inside a statistics dictionary,
21-
and raise an error if they're unequal.
22-
23-
Args:
24-
row_group: dataset fragment row group
25-
stat_key (str): column name of interest.
26-
Returns:
27-
The value of the specified row group statistic
28-
"""
29-
if stat_key not in row_group.statistics:
30-
raise ValueError(f"row group doesn't have expected key {stat_key}")
31-
stat_dict = row_group.statistics[stat_key]
32-
min_val = stat_dict["min"]
33-
max_val = stat_dict["max"]
34-
if min_val != max_val:
35-
raise ValueError(f"stat min != max ({min_val} != {max_val})")
36-
return min_val
37-
38-
39-
def get_healpix_pixel_from_metadata(
40-
metadata: pq.FileMetaData, norder_column: str = "Norder", npix_column: str = "Npix"
41-
) -> HealpixPixel:
42-
"""Get the healpix pixel according to a parquet file's metadata.
43-
44-
This is determined by the value of Norder and Npix in the table's data
45-
46-
Args:
47-
metadata (pyarrow.parquet.FileMetaData): full metadata for a single file.
48-
49-
Returns:
50-
Healpix pixel representing the Norder and Npix from the first row group.
51-
"""
52-
if metadata.num_row_groups <= 0 or metadata.num_columns <= 0:
53-
raise ValueError("metadata is for empty table")
54-
order = -1
55-
pixel = -1
56-
first_row_group = metadata.row_group(0)
57-
for i in range(0, first_row_group.num_columns):
58-
column = first_row_group.column(i)
59-
if column.path_in_schema == norder_column:
60-
if column.statistics.min != column.statistics.max:
61-
raise ValueError(
62-
f"{norder_column} stat min != max ({column.statistics.min} != {column.statistics.max})"
63-
)
64-
order = column.statistics.min
65-
elif column.path_in_schema == npix_column:
66-
if column.statistics.min != column.statistics.max:
67-
raise ValueError(
68-
f"{npix_column} stat min != max ({column.statistics.min} != {column.statistics.max})"
69-
)
70-
pixel = column.statistics.min
71-
72-
if order == -1 or pixel == -1:
73-
raise ValueError(f"Metadata missing Norder ({norder_column}) or Npix ({npix_column}) column")
74-
return HealpixPixel(order, pixel)
75-
76-
7717
def write_parquet_metadata(
7818
catalog_path: str | Path | UPath,
7919
order_by_healpix=True,
@@ -122,8 +62,6 @@ def write_parquet_metadata(
12262

12363
if order_by_healpix:
12464
healpix_pixel = paths.get_healpix_from_path(relative_path)
125-
if healpix_pixel == INVALID_PIXEL:
126-
healpix_pixel = get_healpix_pixel_from_metadata(single_metadata)
12765

12866
healpix_pixels.append(healpix_pixel)
12967
metadata_collector.append(single_metadata)

src/hats/io/validation.py

+2-11
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,9 @@ def handle_error(msg):
102102
print(f"Found {len(expected_pixels)} partitions.")
103103

104104
## Compare the pixels in _metadata with partition_info.csv
105-
# Use both strategies of reading the partition info: strict and !strict.
106-
metadata_pixels = sort_pixels(
107-
PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels()
108-
)
109-
if not np.array_equal(expected_pixels, metadata_pixels):
110-
handle_error("Partition pixels differ between catalog and _metadata file (strict)")
111-
112-
metadata_pixels = sort_pixels(
113-
PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels()
114-
)
105+
metadata_pixels = sort_pixels(PartitionInfo.read_from_file(metadata_file).get_healpix_pixels())
115106
if not np.array_equal(expected_pixels, metadata_pixels):
116-
handle_error("Partition pixels differ between catalog and _metadata file (non-strict)")
107+
handle_error("Partition pixels differ between catalog and _metadata file")
117108

118109
partition_info_file = get_partition_info_pointer(pointer)
119110
partition_info = PartitionInfo.read_from_csv(partition_info_file)

tests/hats/catalog/test_partition_info.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,20 @@ def test_load_partition_info_from_metadata(small_sky_dir, small_sky_source_dir,
2929
partitions = PartitionInfo.read_from_file(metadata_file)
3030
assert partitions.get_healpix_pixels() == small_sky_source_pixels
3131

32-
partitions = PartitionInfo.read_from_file(metadata_file, strict=True)
33-
assert partitions.get_healpix_pixels() == small_sky_source_pixels
34-
3532

3633
def test_load_partition_info_from_metadata_fail(tmp_path):
3734
empty_dataframe = pd.DataFrame()
3835
metadata_filename = tmp_path / "empty_metadata.parquet"
3936
empty_dataframe.to_parquet(metadata_filename)
40-
with pytest.raises(ValueError, match="missing Norder"):
37+
with pytest.raises(ValueError, match="Insufficient metadata"):
4138
PartitionInfo.read_from_file(metadata_filename)
4239

43-
with pytest.raises(ValueError, match="at least one column"):
44-
PartitionInfo.read_from_file(metadata_filename, strict=True)
45-
4640
non_healpix_dataframe = pd.DataFrame({"data": [0], "Npix": [45]})
4741
metadata_filename = tmp_path / "non_healpix_metadata.parquet"
4842
non_healpix_dataframe.to_parquet(metadata_filename)
49-
with pytest.raises(ValueError, match="missing Norder"):
43+
with pytest.raises(ValueError, match="Insufficient metadata"):
5044
PartitionInfo.read_from_file(metadata_filename)
5145

52-
with pytest.raises(ValueError, match="empty file path"):
53-
PartitionInfo.read_from_file(metadata_filename, strict=True)
54-
5546

5647
def test_load_partition_info_from_dir_fail(tmp_path):
5748
empty_dataframe = pd.DataFrame()
@@ -65,7 +56,7 @@ def test_load_partition_info_from_dir_fail(tmp_path):
6556
metadata_filename = tmp_path / "dataset" / "_metadata"
6657
empty_dataframe.to_parquet(metadata_filename)
6758
with pytest.warns(UserWarning, match="slow"):
68-
with pytest.raises(ValueError, match="missing Norder"):
59+
with pytest.raises(ValueError, match="Insufficient metadata"):
6960
PartitionInfo.read_from_dir(tmp_path)
7061

7162

tests/hats/io/test_parquet_metadata.py

+1-105
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,9 @@
55
import pandas as pd
66
import pyarrow as pa
77
import pyarrow.parquet as pq
8-
import pytest
98

109
from hats.io import file_io, paths
11-
from hats.io.parquet_metadata import (
12-
aggregate_column_statistics,
13-
read_row_group_fragments,
14-
row_group_stat_single_value,
15-
write_parquet_metadata,
16-
)
10+
from hats.io.parquet_metadata import aggregate_column_statistics, write_parquet_metadata
1711

1812

1913
def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema):
@@ -126,24 +120,6 @@ def test_write_index_parquet_metadata(tmp_path, check_parquet_schema):
126120
)
127121

128122

129-
def test_row_group_fragments(small_sky_order1_dir):
130-
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)
131-
132-
num_row_groups = 0
133-
for _ in read_row_group_fragments(partition_info_file):
134-
num_row_groups += 1
135-
136-
assert num_row_groups == 4
137-
138-
139-
def test_row_group_fragments_with_dir(small_sky_order1_dir):
140-
num_row_groups = 0
141-
for _ in read_row_group_fragments(small_sky_order1_dir):
142-
num_row_groups += 1
143-
144-
assert num_row_groups == 4
145-
146-
147123
def test_aggregate_column_statistics(small_sky_order1_dir):
148124
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)
149125

@@ -193,83 +169,3 @@ def test_aggregate_column_statistics_with_nulls(tmp_path):
193169
assert data_stats["min_value"] == 1
194170
assert data_stats["max_value"] == 6
195171
assert data_stats["null_count"] == 4
196-
197-
198-
def test_row_group_stats(small_sky_dir):
199-
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_dir)
200-
first_row_group = next(read_row_group_fragments(partition_info_file))
201-
202-
assert row_group_stat_single_value(first_row_group, "Norder") == 0
203-
assert row_group_stat_single_value(first_row_group, "Npix") == 11
204-
205-
with pytest.raises(ValueError, match="doesn't have expected key"):
206-
row_group_stat_single_value(first_row_group, "NOT HERE")
207-
208-
with pytest.raises(ValueError, match="stat min != max"):
209-
row_group_stat_single_value(first_row_group, "ra")
210-
211-
212-
# def test_get_healpix_pixel_from_metadata(small_sky_dir):
213-
# partition_info_file = paths.get_parquet_metadata_pointer(small_sky_dir)
214-
# single_metadata = file_io.read_parquet_metadata(partition_info_file)
215-
# pixel = get_healpix_pixel_from_metadata(single_metadata)
216-
# assert pixel == HealpixPixel(0, 11)
217-
218-
219-
# def test_get_healpix_pixel_from_metadata_min_max(tmp_path):
220-
# good_healpix_dataframe = pd.DataFrame({"data": [0, 1], "Norder": [1, 1], "Npix": [44, 44]})
221-
# metadata_filename = tmp_path / "non_healpix_metadata.parquet"
222-
# good_healpix_dataframe.to_parquet(metadata_filename)
223-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
224-
# pixel = get_healpix_pixel_from_metadata(single_metadata)
225-
# assert pixel == HealpixPixel(1, 44)
226-
227-
# non_healpix_dataframe = pd.DataFrame({"data": [0, 1], "Npix": [45, 44]})
228-
# non_healpix_dataframe.to_parquet(metadata_filename)
229-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
230-
# with pytest.raises(ValueError, match="Npix stat min != max"):
231-
# get_healpix_pixel_from_metadata(single_metadata)
232-
233-
# non_healpix_dataframe = pd.DataFrame({"data": [0, 1], "Norder": [5, 6]})
234-
# non_healpix_dataframe.to_parquet(metadata_filename)
235-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
236-
# with pytest.raises(ValueError, match="Norder stat min != max"):
237-
# get_healpix_pixel_from_metadata(single_metadata)
238-
239-
240-
# def test_get_healpix_pixel_from_metadata_fail(tmp_path):
241-
# empty_dataframe = pd.DataFrame()
242-
# metadata_filename = tmp_path / "empty_metadata.parquet"
243-
# empty_dataframe.to_parquet(metadata_filename)
244-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
245-
# with pytest.raises(ValueError, match="empty table"):
246-
# get_healpix_pixel_from_metadata(single_metadata)
247-
248-
# non_healpix_dataframe = pd.DataFrame({"data": [0], "Npix": [45]})
249-
# metadata_filename = tmp_path / "non_healpix_metadata.parquet"
250-
# non_healpix_dataframe.to_parquet(metadata_filename)
251-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
252-
# with pytest.raises(ValueError, match="missing Norder"):
253-
# get_healpix_pixel_from_metadata(single_metadata)
254-
255-
256-
# def test_get_healpix_pixel_from_metadata_columns(tmp_path):
257-
# """Test fetching the healpix pixel from columns with non-default names."""
258-
# non_healpix_dataframe = pd.DataFrame({"data": [1], "Npix": [45], "join_Norder": [2], "join_Npix": [3]})
259-
# metadata_filename = tmp_path / "non_healpix_metadata.parquet"
260-
# non_healpix_dataframe.to_parquet(metadata_filename)
261-
# single_metadata = file_io.read_parquet_metadata(metadata_filename)
262-
# with pytest.raises(ValueError, match="missing Norder"):
263-
# get_healpix_pixel_from_metadata(single_metadata)
264-
265-
# pixel = get_healpix_pixel_from_metadata(single_metadata, norder_column="data")
266-
# assert pixel == HealpixPixel(1, 45)
267-
268-
# pixel = get_healpix_pixel_from_metadata(
269-
# single_metadata, norder_column="join_Norder", npix_column="join_Npix"
270-
# )
271-
# assert pixel == HealpixPixel(2, 3)
272-
273-
# ## People can do silly things!
274-
# pixel = get_healpix_pixel_from_metadata(single_metadata, norder_column="data", npix_column="join_Npix")
275-
# assert pixel == HealpixPixel(1, 3)

0 commit comments

Comments
 (0)