diff --git a/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py b/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py deleted file mode 100644 index 180c4bd07347..000000000000 --- a/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright 2024 Flower Labs GmbH. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""VerticalEvenPartitioner class.""" -# flake8: noqa: E501 -# pylint: disable=C0301, R0902, R0913 -from typing import Literal, Optional, Union - -import numpy as np - -import datasets -from flwr_datasets.partitioner.partitioner import Partitioner -from flwr_datasets.partitioner.vertical_partitioner_utils import ( - _add_active_party_columns, - _list_split, -) - - -class VerticalEvenPartitioner(Partitioner): - """Partitioner that splits features (columns) evenly into vertical partitions. - - Enables selection of "active party" column(s) and palcement into - a specific partition or creation of a new partition just for it. - Also enables droping columns and sharing specified columns across - all partitions. - - The number and nature of partitions can be defined in various ways: - - By specifying a simple integer for even splitting. - - By providing ratios or absolute counts for each partition. - - By explicitly listing the columns for each partition. - (see `column_distribution` and `mode` parameters for more details) - - Parameters - ---------- - num_partitions : int - Number of partitions to create. - active_party_columns : Optional[list[str]] - Columns associated with the "active party" (which can be the server). - active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int] - Determines how to assign the active party columns: - - "add_to_first": Append active party columns to the first partition. - - "add_to_last": Append active party columns to the last partition. - - int: Append active party columns to the specified partition index. - - "create_as_first": Create a new partition at the start containing only - these columns. - - "create_as_last": Create a new partition at the end containing only - these columns. - - "add_to_all": Append active party columns to all partitions. - drop_columns : Optional[list[str]] - Columns to remove entirely from the dataset before partitioning. - shared_columns : Optional[list[str]] - Columns to duplicate into every partition after initial partitioning. - shuffle : bool - Whether to shuffle the order of columns before partitioning. - seed : Optional[int] - Random seed for shuffling columns. Has no effect if `shuffle=False`. - - Examples - -------- - >>> partitioner = VerticalEvenPartitioner( - ... num_partitions=3, - ... active_party_columns=["income"], - ... active_party_columns_mode="add_to_last", - ... shuffle=True, - ... seed=42 - ... ) - >>> fds = FederatedDataset( - ... dataset="scikit-learn/adult-census-income", - ... partitioners={"train": partitioner} - ... ) - >>> partitions = [fds.load_partition(i) for i in range(partitioner.num_partitions)] - >>> print([partition.column_names for partition in partitions]) - """ - - def __init__( - self, - num_partitions: int, - active_party_columns: Optional[list[str]] = None, - active_party_columns_mode: Union[ - Literal[ - "add_to_first", - "add_to_last", - "create_as_first", - "create_as_last", - "add_to_all", - ], - int, - ] = "add_to_last", - drop_columns: Optional[list[str]] = None, - shared_columns: Optional[list[str]] = None, - shuffle: bool = True, - seed: Optional[int] = 42, - ) -> None: - super().__init__() - - self._num_partitions = num_partitions - self._active_party_columns = active_party_columns or [] - self._active_party_columns_mode = active_party_columns_mode - self._drop_columns = drop_columns or [] - self._shared_columns = shared_columns or [] - self._shuffle = shuffle - self._seed = seed - self._rng = np.random.default_rng(seed=self._seed) - - self._partition_columns: Optional[list[list[str]]] = None - self._partitions_determined = False - - self._validate_parameters_in_init() - - def _determine_partitions_if_needed(self) -> None: - if self._partitions_determined: - return - - if self.dataset is None: - raise ValueError("No dataset is set for this partitioner.") - - all_columns = list(self.dataset.column_names) - self._validate_parameters_while_partitioning( - all_columns, self._shared_columns, self._active_party_columns - ) - columns = [column for column in all_columns if column not in self._drop_columns] - columns = [column for column in columns if column not in self._shared_columns] - columns = [ - column for column in columns if column not in self._active_party_columns - ] - - if self._shuffle: - self._rng.shuffle(columns) - partition_columns = _list_split(columns, self._num_partitions) - partition_columns = _add_active_party_columns( - self._active_party_columns, - self._active_party_columns_mode, - partition_columns, - ) - - # Add shared columns to all partitions - for partition in partition_columns: - for column in self._shared_columns: - partition.append(column) - - self._partition_columns = partition_columns - self._partitions_determined = True - - def load_partition(self, partition_id: int) -> datasets.Dataset: - """Load a partition based on the partition index. - - Parameters - ---------- - partition_id : int - The index that corresponds to the requested partition. - - Returns - ------- - dataset_partition : Dataset - Single partition of a dataset. - """ - self._determine_partitions_if_needed() - assert self._partition_columns is not None - if partition_id < 0 or partition_id >= len(self._partition_columns): - raise ValueError(f"Invalid partition_id {partition_id}.") - columns = self._partition_columns[partition_id] - return self.dataset.select_columns(columns) - - @property - def num_partitions(self) -> int: - """Number of partitions.""" - self._determine_partitions_if_needed() - assert self._partition_columns is not None - return len(self._partition_columns) - - def _validate_parameters_in_init(self) -> None: - if self._num_partitions < 1: - raise ValueError("column_distribution as int must be >= 1.") - - # Validate columns lists - for parameter_name, parameter_list in [ - ("drop_columns", self._drop_columns), - ("shared_columns", self._shared_columns), - ("active_party_columns", self._active_party_columns), - ]: - if not all(isinstance(column, str) for column in parameter_list): - raise ValueError(f"All entries in {parameter_name} must be strings.") - - valid_modes = { - "add_to_first", - "add_to_last", - "create_as_first", - "create_as_last", - "add_to_all", - } - if not ( - isinstance(self._active_party_columns_mode, int) - or self._active_party_columns_mode in valid_modes - ): - raise ValueError( - "active_party_columns_mode must be an int or one of " - "'add_to_first', 'add_to_last', 'create_as_first', 'create_as_last', " - "'add_to_all'." - ) - - def _validate_parameters_while_partitioning( - self, - all_columns: list[str], - shared_columns: list[str], - active_party_columns: list[str], - ) -> None: - # Shared columns existance check - for column in shared_columns: - if column not in all_columns: - raise ValueError(f"Shared column '{column}' not found in the dataset.") - # Active party columns existence check - for column in active_party_columns: - if column not in all_columns: - raise ValueError( - f"Active party column '{column}' not found in the dataset." - ) diff --git a/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py b/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py deleted file mode 100644 index 8e766617d609..000000000000 --- a/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright 2024 Flower Labs GmbH. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""VerticalEvenPartitioner class tests.""" -# mypy: disable-error-code=list-item -import unittest - -import numpy as np - -from datasets import Dataset -from flwr_datasets.partitioner.vertical_even_partitioner import VerticalEvenPartitioner - - -def _create_dummy_dataset(column_names: list[str], num_rows: int = 100) -> Dataset: - """Create a dummy dataset with random data for testing.""" - data = {} - rng = np.random.default_rng(seed=42) - for col in column_names: - # Just numeric data; could also be strings, categoricals, etc. - data[col] = rng.integers(0, 100, size=num_rows).tolist() - return Dataset.from_dict(data) - - -class TestVerticalEvenPartitioner(unittest.TestCase): - """Unit tests for VerticalEvenPartitioner.""" - - def test_init_with_invalid_num_partitions(self) -> None: - """Test that initializing with an invalid number of partitions.""" - with self.assertRaises(ValueError): - VerticalEvenPartitioner(num_partitions=0) - - def test_init_with_invalid_active_party_mode(self) -> None: - """Test initialization with invalid active_party_columns_mode.""" - with self.assertRaises(ValueError): - VerticalEvenPartitioner( - num_partitions=2, - active_party_columns_mode="invalid_mode", # type: ignore[arg-type] - ) - - def test_init_with_non_string_drop_columns(self) -> None: - """Test initialization with non-string elements in drop_columns.""" - with self.assertRaises(ValueError): - VerticalEvenPartitioner(num_partitions=2, drop_columns=[1, "a", 3]) - - def test_init_with_non_string_shared_columns(self) -> None: - """Test initialization with non-string elements in shared_columns.""" - with self.assertRaises(ValueError): - VerticalEvenPartitioner(num_partitions=2, shared_columns=["col1", 123]) - - def test_init_with_non_string_active_party_columns(self) -> None: - """Test initialization with non-string elements in active_party_columns.""" - with self.assertRaises(ValueError): - VerticalEvenPartitioner( - num_partitions=2, active_party_columns=["col1", None] - ) - - def test_partitioning_basic(self) -> None: - """Test basic partitioning with no special columns or dropping.""" - columns = ["feature1", "feature2", "feature3", "feature4"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner(num_partitions=2, shuffle=False) - partitioner.dataset = dataset - - self.assertEqual(partitioner.num_partitions, 2) - - p0 = partitioner.load_partition(0) - p1 = partitioner.load_partition(1) - - self.assertEqual(len(p0.column_names), 2) - self.assertEqual(len(p1.column_names), 2) - self.assertIn("feature1", p0.column_names) - self.assertIn("feature2", p0.column_names) - self.assertIn("feature3", p1.column_names) - self.assertIn("feature4", p1.column_names) - - def test_partitioning_with_drop_columns(self) -> None: - """Test partitioning while dropping some columns.""" - columns = ["feature1", "feature2", "drop_me", "feature3", "feature4"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, drop_columns=["drop_me"], shuffle=False, seed=42 - ) - partitioner.dataset = dataset - - p0 = partitioner.load_partition(0) - p1 = partitioner.load_partition(1) - all_partition_columns = p0.column_names + p1.column_names - - # The drop_me should not be in any partition - self.assertNotIn("drop_me", all_partition_columns) - # The rest of columns should be distributed - self.assertIn("feature1", all_partition_columns) - self.assertIn("feature2", all_partition_columns) - self.assertIn("feature3", all_partition_columns) - self.assertIn("feature4", all_partition_columns) - - def test_partitioning_with_shared_columns(self) -> None: - """Test that shared columns are present in all partitions.""" - columns = ["f1", "f2", "f3", "f4", "shared_col"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, shared_columns=["shared_col"], shuffle=False, seed=42 - ) - partitioner.dataset = dataset - - p0 = partitioner.load_partition(0) - p1 = partitioner.load_partition(1) - - self.assertIn("shared_col", p0.column_names) - self.assertIn("shared_col", p1.column_names) - - def test_partitioning_with_active_party_columns_add_to_last(self) -> None: - """Test active party columns are appended to the last partition.""" - columns = ["f1", "f2", "f3", "f4", "income"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, - active_party_columns=["income"], - active_party_columns_mode="add_to_last", - shuffle=False, - seed=42, - ) - partitioner.dataset = dataset - - p0 = partitioner.load_partition(0) - p1 = partitioner.load_partition(1) - - # The income should be only in the last partition - self.assertNotIn("income", p0.column_names) - self.assertIn("income", p1.column_names) - - def test_partitioning_with_active_party_columns_create_as_first(self) -> None: - """Test creating a new partition solely for active party columns.""" - columns = ["f1", "f2", "f3", "f4", "income"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, - active_party_columns=["income"], - active_party_columns_mode="create_as_first", - shuffle=False, - ) - partitioner.dataset = dataset - - # The first partition should be just the active party columns - # and then two more partitions from original splitting. - self.assertEqual(partitioner.num_partitions, 3) - - p0 = partitioner.load_partition(0) # active party partition - p1 = partitioner.load_partition(1) - p2 = partitioner.load_partition(2) - - self.assertEqual(p0.column_names, ["income"]) - self.assertIn("f1", p1.column_names) - self.assertIn("f2", p1.column_names) - self.assertIn("f3", p2.column_names) - self.assertIn("f4", p2.column_names) - - def test_partitioning_with_nonexistent_active_party_columns(self) -> None: - """Test that a ValueError is raised if active party column does not exist.""" - columns = ["f1", "f2", "f3", "f4"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, - active_party_columns=["income"], # Not present in dataset - active_party_columns_mode="add_to_last", - shuffle=False, - ) - partitioner.dataset = dataset - - with self.assertRaises(ValueError) as context: - partitioner.load_partition(0) - self.assertIn("Active party column 'income' not found", str(context.exception)) - - def test_partitioning_with_nonexistent_shared_columns(self) -> None: - """Test that a ValueError is raised if shared column does not exist.""" - columns = ["f1", "f2", "f3"] - dataset = _create_dummy_dataset(columns, num_rows=50) - partitioner = VerticalEvenPartitioner( - num_partitions=2, shared_columns=["nonexistent_col"], shuffle=False - ) - partitioner.dataset = dataset - - with self.assertRaises(ValueError) as context: - partitioner.load_partition(0) - self.assertIn( - "Shared column 'nonexistent_col' not found", str(context.exception) - ) - - -if __name__ == "__main__": - unittest.main()