From 7a5ba4b8f145e1c1fedb26d5be4b9d1b56ee8c73 Mon Sep 17 00:00:00 2001 From: Dimitris Stripelis <11239849+dstripelis@users.noreply.github.com> Date: Mon, 16 Dec 2024 21:34:08 +0200 Subject: [PATCH 01/15] docs(framework) Fix typo (#4723) --- .../docs/source/how-to-run-flower-on-azure.rst | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/framework/docs/source/how-to-run-flower-on-azure.rst b/framework/docs/source/how-to-run-flower-on-azure.rst index dfeda022377f..d9b3305943ab 100644 --- a/framework/docs/source/how-to-run-flower-on-azure.rst +++ b/framework/docs/source/how-to-run-flower-on-azure.rst @@ -19,15 +19,16 @@ Run Flower on Azure .. note:: - There are many ways to deploy Flower on Microst Azure. The instructions provided in - this guide is just a basic walkthrough, step-by-step guide on how to quickly setup - and run a Flower application on a Federated Learning environment on Microst Azure. + There are many ways to deploy Flower on Microsoft Azure. The instructions provided + in this guide is just a basic walkthrough, step-by-step guide on how to quickly + setup and run a Flower application on a Federated Learning environment on Microsoft + Azure. -In this how-to guide, we want to create a Federated Learning environment on Microst +In this how-to guide, we want to create a Federated Learning environment on Microsoft Azure using three Virtual Machines (VMs). From the three machines, one machine will be used as the Federation server and two as the Federation clients. Our goal is to create a -Flower federation on Microst Azure where we can run Flower apps from our local machine, -e.g., laptop. +Flower federation on Microsoft Azure where we can run Flower apps from our local +machine, e.g., laptop. On the Federation server VM we will deploy the long-running Flower server (``SuperLink``) and on the two Federation client VMs we will deploy the long-running @@ -43,7 +44,7 @@ networking rules to allow cross-VM communication. VM Create ~~~~~~~~~ -Assuming we are already inside the Microst Azure portal, we navigate to the ``Create`` +Assuming we are already inside the Microsoft Azure portal, we navigate to the ``Create`` page and we select ``Azure virtual machine``. In the new page, for each VM we edit the properties as follows: From 2c2e11617e404f6eaa11da470d07e44e76d9f226 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 16 Dec 2024 19:42:13 +0000 Subject: [PATCH 02/15] refactor(framework) Move `print_json_error` to utility function (#4711) --- src/py/flwr/cli/ls.py | 19 +++---------------- src/py/flwr/cli/run/run.py | 18 +++--------------- src/py/flwr/common/logger.py | 17 ++++++++++++++++- 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/py/flwr/cli/ls.py b/src/py/flwr/cli/ls.py index 1e8b56b576e4..e845583d61ac 100644 --- a/src/py/flwr/cli/ls.py +++ b/src/py/flwr/cli/ls.py @@ -19,13 +19,12 @@ import json from datetime import datetime, timedelta from pathlib import Path -from typing import Annotated, Optional, Union +from typing import Annotated, Optional import typer from rich.console import Console from rich.table import Table from rich.text import Text -from typer import Exit from flwr.cli.config_utils import ( exit_if_no_address, @@ -35,7 +34,7 @@ ) from flwr.common.constant import FAB_CONFIG_FILE, CliOutputFormat, SubStatus from flwr.common.date import format_timedelta, isoformat8601_utc -from flwr.common.logger import redirect_output, remove_emojis, restore_output +from flwr.common.logger import print_json_error, redirect_output, restore_output from flwr.common.serde import run_from_proto from flwr.common.typing import Run from flwr.proto.exec_pb2 import ( # pylint: disable=E0611 @@ -145,7 +144,7 @@ def ls( # pylint: disable=too-many-locals, too-many-branches if suppress_output: restore_output() e_message = captured_output.getvalue() - _print_json_error(e_message, err) + print_json_error(e_message, err) else: typer.secho( f"{err}", @@ -323,15 +322,3 @@ def _display_one_run( Console().print_json(_to_json(formatted_runs)) else: Console().print(_to_table(formatted_runs)) - - -def _print_json_error(msg: str, e: Union[Exit, Exception]) -> None: - """Print error message as JSON.""" - Console().print_json( - json.dumps( - { - "success": False, - "error-message": remove_emojis(str(msg) + "\n" + str(e)), - } - ) - ) diff --git a/src/py/flwr/cli/run/run.py b/src/py/flwr/cli/run/run.py index 34e7fc50e2f0..19db2ade39dc 100644 --- a/src/py/flwr/cli/run/run.py +++ b/src/py/flwr/cli/run/run.py @@ -19,7 +19,7 @@ import json import subprocess from pathlib import Path -from typing import Annotated, Any, Optional, Union +from typing import Annotated, Any, Optional import typer from rich.console import Console @@ -37,7 +37,7 @@ user_config_to_configsrecord, ) from flwr.common.constant import CliOutputFormat -from flwr.common.logger import redirect_output, remove_emojis, restore_output +from flwr.common.logger import print_json_error, redirect_output, restore_output from flwr.common.serde import ( configs_record_to_proto, fab_to_proto, @@ -122,7 +122,7 @@ def run( if suppress_output: restore_output() e_message = captured_output.getvalue() - _print_json_error(e_message, err) + print_json_error(e_message, err) else: typer.secho( f"{err}", @@ -239,15 +239,3 @@ def _run_without_exec_api( check=True, text=True, ) - - -def _print_json_error(msg: str, e: Union[typer.Exit, Exception]) -> None: - """Print error message as JSON.""" - Console().print_json( - json.dumps( - { - "success": False, - "error-message": remove_emojis(str(msg) + "\n" + str(e)), - } - ) - ) diff --git a/src/py/flwr/common/logger.py b/src/py/flwr/common/logger.py index 00395cacf2b8..64763faf740d 100644 --- a/src/py/flwr/common/logger.py +++ b/src/py/flwr/common/logger.py @@ -15,6 +15,7 @@ """Flower Logger.""" +import json as _json import logging import re import sys @@ -27,6 +28,8 @@ from typing import TYPE_CHECKING, Any, Optional, TextIO, Union import grpc +import typer +from rich.console import Console from flwr.proto.log_pb2 import PushLogsRequest # pylint: disable=E0611 from flwr.proto.node_pb2 import Node # pylint: disable=E0611 @@ -377,7 +380,7 @@ def stop_log_uploader( log_uploader.join() -def remove_emojis(text: str) -> str: +def _remove_emojis(text: str) -> str: """Remove emojis from the provided text.""" emoji_pattern = re.compile( "[" @@ -391,3 +394,15 @@ def remove_emojis(text: str) -> str: flags=re.UNICODE, ) return emoji_pattern.sub(r"", text) + + +def print_json_error(msg: str, e: Union[typer.Exit, Exception]) -> None: + """Print error message as JSON.""" + Console().print_json( + _json.dumps( + { + "success": False, + "error-message": _remove_emojis(str(msg) + "\n" + str(e)), + } + ) + ) From 0d1cff5a1f60482b3b0e66914201bb38458af8eb Mon Sep 17 00:00:00 2001 From: Vidit Khandelwal <34510012+TheVidz@users.noreply.github.com> Date: Tue, 17 Dec 2024 01:23:44 +0530 Subject: [PATCH 03/15] fix(examples) Remove `numpy` specification in `vertical-fl` (#4724) --- examples/vertical-fl/pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/vertical-fl/pyproject.toml b/examples/vertical-fl/pyproject.toml index 2376b55e1110..4efa726eb530 100644 --- a/examples/vertical-fl/pyproject.toml +++ b/examples/vertical-fl/pyproject.toml @@ -10,7 +10,6 @@ license = "Apache-2.0" dependencies = [ "flwr[simulation]>=1.13.1", "flwr-datasets>=0.3.0", - "numpy==1.24.4", "pandas==2.0.3", "scikit-learn==1.3.2", "torch==2.1.0", From 194cc9341bc311b546d2d44363430684c3ce0e9a Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 16 Dec 2024 20:34:10 +0000 Subject: [PATCH 04/15] refactor(framework) Add `try`-`except` block to `flwr stop` for `JSON` output (#4712) --- src/py/flwr/cli/stop.py | 52 ++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/src/py/flwr/cli/stop.py b/src/py/flwr/cli/stop.py index acfa818efb32..84002f0f6e27 100644 --- a/src/py/flwr/cli/stop.py +++ b/src/py/flwr/cli/stop.py @@ -48,34 +48,42 @@ def stop( ] = None, ) -> None: """Stop a run.""" - # Load and validate federation config - typer.secho("Loading project configuration... ", fg=typer.colors.BLUE) + try: + # Load and validate federation config + typer.secho("Loading project configuration... ", fg=typer.colors.BLUE) - pyproject_path = app / FAB_CONFIG_FILE if app else None - config, errors, warnings = load_and_validate(path=pyproject_path) - config = process_loaded_project_config(config, errors, warnings) - federation, federation_config = validate_federation_in_project_config( - federation, config - ) - exit_if_no_address(federation_config, "stop") + pyproject_path = app / FAB_CONFIG_FILE if app else None + config, errors, warnings = load_and_validate(path=pyproject_path) + config = process_loaded_project_config(config, errors, warnings) + federation, federation_config = validate_federation_in_project_config( + federation, config + ) + exit_if_no_address(federation_config, "stop") - try: - auth_plugin = try_obtain_cli_auth_plugin(app, federation) - channel = init_channel(app, federation_config, auth_plugin) - stub = ExecStub(channel) # pylint: disable=unused-variable # noqa: F841 + try: + auth_plugin = try_obtain_cli_auth_plugin(app, federation) + channel = init_channel(app, federation_config, auth_plugin) + stub = ExecStub(channel) # pylint: disable=unused-variable # noqa: F841 - typer.secho(f"✋ Stopping run ID {run_id}...", fg=typer.colors.GREEN) - _stop_run(stub, run_id=run_id) + typer.secho(f"✋ Stopping run ID {run_id}...", fg=typer.colors.GREEN) + _stop_run(stub, run_id=run_id) - except ValueError as err: - typer.secho( - f"❌ {err}", - fg=typer.colors.RED, - bold=True, - ) + except ValueError as err: + typer.secho( + f"❌ {err}", + fg=typer.colors.RED, + bold=True, + ) + raise typer.Exit(code=1) from err + finally: + channel.close() + except ( + typer.Exit, + Exception, + ) as err: # pylint: disable=broad-except, W0612 # noqa: F841 raise typer.Exit(code=1) from err finally: - channel.close() + pass def _stop_run( From 210ae959d0b7cd7db14f0550d4066db8a9101719 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 16 Dec 2024 20:53:00 +0000 Subject: [PATCH 05/15] feat(framework) Add JSON output to `flwr stop` (#4710) --- src/py/flwr/cli/stop.py | 58 +++++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/src/py/flwr/cli/stop.py b/src/py/flwr/cli/stop.py index 84002f0f6e27..4989d216f28f 100644 --- a/src/py/flwr/cli/stop.py +++ b/src/py/flwr/cli/stop.py @@ -15,10 +15,13 @@ """Flower command line interface `stop` command.""" +import io +import json from pathlib import Path from typing import Annotated, Optional import typer +from rich.console import Console from flwr.cli.config_utils import ( exit_if_no_address, @@ -26,14 +29,15 @@ process_loaded_project_config, validate_federation_in_project_config, ) -from flwr.common.constant import FAB_CONFIG_FILE +from flwr.common.constant import FAB_CONFIG_FILE, CliOutputFormat +from flwr.common.logger import print_json_error, redirect_output, restore_output from flwr.proto.exec_pb2 import StopRunRequest, StopRunResponse # pylint: disable=E0611 from flwr.proto.exec_pb2_grpc import ExecStub from .utils import init_channel, try_obtain_cli_auth_plugin -def stop( +def stop( # pylint: disable=R0914 run_id: Annotated[ # pylint: disable=unused-argument int, typer.Argument(help="The Flower run ID to stop"), @@ -46,9 +50,22 @@ def stop( Optional[str], typer.Argument(help="Name of the federation"), ] = None, + output_format: Annotated[ + str, + typer.Option( + "--format", + case_sensitive=False, + help="Format output using 'default' view or 'json'", + ), + ] = CliOutputFormat.DEFAULT, ) -> None: """Stop a run.""" + suppress_output = output_format == CliOutputFormat.JSON + captured_output = io.StringIO() try: + if suppress_output: + redirect_output(captured_output) + # Load and validate federation config typer.secho("Loading project configuration... ", fg=typer.colors.BLUE) @@ -66,7 +83,7 @@ def stop( stub = ExecStub(channel) # pylint: disable=unused-variable # noqa: F841 typer.secho(f"✋ Stopping run ID {run_id}...", fg=typer.colors.GREEN) - _stop_run(stub, run_id=run_id) + _stop_run(stub=stub, run_id=run_id, output_format=output_format) except ValueError as err: typer.secho( @@ -77,23 +94,36 @@ def stop( raise typer.Exit(code=1) from err finally: channel.close() - except ( - typer.Exit, - Exception, - ) as err: # pylint: disable=broad-except, W0612 # noqa: F841 - raise typer.Exit(code=1) from err + except (typer.Exit, Exception) as err: # pylint: disable=broad-except + if suppress_output: + restore_output() + e_message = captured_output.getvalue() + print_json_error(e_message, err) + else: + typer.secho( + f"{err}", + fg=typer.colors.RED, + bold=True, + ) finally: - pass + if suppress_output: + restore_output() + captured_output.close() -def _stop_run( - stub: ExecStub, # pylint: disable=unused-argument - run_id: int, # pylint: disable=unused-argument -) -> None: +def _stop_run(stub: ExecStub, run_id: int, output_format: str) -> None: """Stop a run.""" response: StopRunResponse = stub.StopRun(request=StopRunRequest(run_id=run_id)) - if response.success: typer.secho(f"✅ Run {run_id} successfully stopped.", fg=typer.colors.GREEN) + if output_format == CliOutputFormat.JSON: + run_output = json.dumps( + { + "success": True, + "run-id": run_id, + } + ) + restore_output() + Console().print_json(run_output) else: typer.secho(f"❌ Run {run_id} couldn't be stopped.", fg=typer.colors.RED) From 4f5e2cb0d71468d4cbc575d5b5ad7ace3418f80b Mon Sep 17 00:00:00 2001 From: Javier Date: Mon, 16 Dec 2024 21:06:41 +0000 Subject: [PATCH 06/15] fix(examples) Update pinned `numpy` in `quickstart-pandas` and `vertical-fl` examples (#4725) --- examples/quickstart-pandas/pyproject.toml | 2 +- examples/vertical-fl/pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/quickstart-pandas/pyproject.toml b/examples/quickstart-pandas/pyproject.toml index 986ae9abd0ac..8e3d8e46ecad 100644 --- a/examples/quickstart-pandas/pyproject.toml +++ b/examples/quickstart-pandas/pyproject.toml @@ -14,7 +14,7 @@ authors = [ dependencies = [ "flwr[simulation]>=1.13.1", "flwr-datasets[vision]>=0.3.0", - "numpy==1.24.4", + "numpy>=1.26.0", "pandas==2.0.0", ] diff --git a/examples/vertical-fl/pyproject.toml b/examples/vertical-fl/pyproject.toml index 4efa726eb530..a1d0a70afe61 100644 --- a/examples/vertical-fl/pyproject.toml +++ b/examples/vertical-fl/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "flwr[simulation]>=1.13.1", "flwr-datasets>=0.3.0", "pandas==2.0.3", + "numpy>=1.26.0", "scikit-learn==1.3.2", "torch==2.1.0", ] From 69253c253d3e28c93a58ff408082b877c050a1dc Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 16 Dec 2024 23:25:12 -0800 Subject: [PATCH 07/15] docs(framework) Fix other versions (#4727) --- .github/workflows/docs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index a0e4aea55a06..dfe40350f647 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -45,6 +45,7 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets. AWS_SECRET_ACCESS_KEY }} DOCS_BUCKET: flower.ai run: | + aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./doc/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/framework aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./framework/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/framework aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./baselines/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/baselines aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./examples/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/examples From 21e55772b5fddf4fab2e639bf6417291706193a0 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 17 Dec 2024 00:34:10 -0800 Subject: [PATCH 08/15] ci(framework) Fix docs deployement workflow (#4728) --- .github/workflows/docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index dfe40350f647..f98dfa43dd18 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -45,7 +45,7 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets. AWS_SECRET_ACCESS_KEY }} DOCS_BUCKET: flower.ai run: | - aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./doc/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/framework + cp -r doc/build/html/v* framework/docs/build/html aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./framework/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/framework aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./baselines/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/baselines aws s3 sync --delete --exclude ".*" --exclude "v/*" --cache-control "no-cache" ./examples/docs/build/html/ s3://${{ env.DOCS_BUCKET }}/docs/examples From ea7c194860afdd72222c13c54202640a2302bfe2 Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 17 Dec 2024 14:20:23 +0000 Subject: [PATCH 09/15] break(framework) Remove setting `Context` as `Client` and `NumPyClient` attribute (#4652) --- e2e/e2e-bare/e2e_bare/client_app.py | 22 ++++++---- e2e/e2e-pytorch/e2e_pytorch/client_app.py | 20 +++++---- src/py/flwr/client/client.py | 32 -------------- .../client/message_handler/message_handler.py | 2 - src/py/flwr/client/numpy_client.py | 44 ------------------- 5 files changed, 24 insertions(+), 96 deletions(-) diff --git a/e2e/e2e-bare/e2e_bare/client_app.py b/e2e/e2e-bare/e2e_bare/client_app.py index 943e60d5db9f..3780774954d4 100644 --- a/e2e/e2e-bare/e2e_bare/client_app.py +++ b/e2e/e2e-bare/e2e_bare/client_app.py @@ -3,7 +3,7 @@ import numpy as np from flwr.client import ClientApp, NumPyClient, start_client -from flwr.common import ConfigsRecord, Context +from flwr.common import ConfigsRecord, Context, RecordSet SUBSET_SIZE = 1000 STATE_VAR = "timestamp" @@ -15,6 +15,9 @@ # Define Flower client class FlowerClient(NumPyClient): + def __init__(self, state: RecordSet): + self.state = state + def get_parameters(self, config): return model_params @@ -22,16 +25,14 @@ def _record_timestamp_to_state(self): """Record timestamp to client's state.""" t_stamp = datetime.now().timestamp() value = str(t_stamp) - if STATE_VAR in self.context.state.configs_records.keys(): - value = self.context.state.configs_records[STATE_VAR][STATE_VAR] # type: ignore + if STATE_VAR in self.state.configs_records.keys(): + value = self.state.configs_records[STATE_VAR][STATE_VAR] # type: ignore value += f",{t_stamp}" - self.context.state.configs_records[STATE_VAR] = ConfigsRecord( - {STATE_VAR: value} - ) + self.state.configs_records[STATE_VAR] = ConfigsRecord({STATE_VAR: value}) def _retrieve_timestamp_from_state(self): - return self.context.state.configs_records[STATE_VAR][STATE_VAR] + return self.state.configs_records[STATE_VAR][STATE_VAR] def fit(self, parameters, config): model_params = parameters @@ -52,7 +53,7 @@ def evaluate(self, parameters, config): def client_fn(context: Context): - return FlowerClient().to_client() + return FlowerClient(context.state).to_client() app = ClientApp( @@ -61,4 +62,7 @@ def client_fn(context: Context): if __name__ == "__main__": # Start Flower client - start_client(server_address="127.0.0.1:8080", client=FlowerClient().to_client()) + start_client( + server_address="127.0.0.1:8080", + client=FlowerClient(state=RecordSet()).to_client(), + ) diff --git a/e2e/e2e-pytorch/e2e_pytorch/client_app.py b/e2e/e2e-pytorch/e2e_pytorch/client_app.py index 988cd774018d..b7f1ce33b3f0 100644 --- a/e2e/e2e-pytorch/e2e_pytorch/client_app.py +++ b/e2e/e2e-pytorch/e2e_pytorch/client_app.py @@ -11,7 +11,7 @@ from tqdm import tqdm from flwr.client import ClientApp, NumPyClient, start_client -from flwr.common import ConfigsRecord, Context +from flwr.common import ConfigsRecord, Context, RecordSet # ############################################################################# # 1. Regular PyTorch pipeline: nn.Module, train, test, and DataLoader @@ -90,6 +90,10 @@ def load_data(): # Define Flower client class FlowerClient(NumPyClient): + + def __init__(self, state: RecordSet): + self.state = state + def get_parameters(self, config): return [val.cpu().numpy() for _, val in net.state_dict().items()] @@ -97,16 +101,14 @@ def _record_timestamp_to_state(self): """Record timestamp to client's state.""" t_stamp = datetime.now().timestamp() value = str(t_stamp) - if STATE_VAR in self.context.state.configs_records.keys(): - value = self.context.state.configs_records[STATE_VAR][STATE_VAR] # type: ignore + if STATE_VAR in self.state.configs_records.keys(): + value = self.state.configs_records[STATE_VAR][STATE_VAR] # type: ignore value += f",{t_stamp}" - self.context.state.configs_records[STATE_VAR] = ConfigsRecord( - {STATE_VAR: value} - ) + self.state.configs_records[STATE_VAR] = ConfigsRecord({STATE_VAR: value}) def _retrieve_timestamp_from_state(self): - return self.context.state.configs_records[STATE_VAR][STATE_VAR] + return self.state.configs_records[STATE_VAR][STATE_VAR] def fit(self, parameters, config): set_parameters(net, parameters) @@ -137,7 +139,7 @@ def set_parameters(model, parameters): def client_fn(context: Context): - return FlowerClient().to_client() + return FlowerClient(context.state).to_client() app = ClientApp( @@ -149,5 +151,5 @@ def client_fn(context: Context): # Start Flower client start_client( server_address="127.0.0.1:8080", - client=FlowerClient().to_client(), + client=FlowerClient(state=RecordSet()).to_client(), ) diff --git a/src/py/flwr/client/client.py b/src/py/flwr/client/client.py index c7a7f01b38ba..ec566b428041 100644 --- a/src/py/flwr/client/client.py +++ b/src/py/flwr/client/client.py @@ -22,7 +22,6 @@ from flwr.common import ( Code, - Context, EvaluateIns, EvaluateRes, FitIns, @@ -34,14 +33,11 @@ Parameters, Status, ) -from flwr.common.logger import warn_deprecated_feature_with_example class Client(ABC): """Abstract base class for Flower clients.""" - _context: Context - def get_properties(self, ins: GetPropertiesIns) -> GetPropertiesRes: """Return set of client's properties. @@ -143,34 +139,6 @@ def evaluate(self, ins: EvaluateIns) -> EvaluateRes: metrics={}, ) - @property - def context(self) -> Context: - """Getter for `Context` client attribute.""" - warn_deprecated_feature_with_example( - "Accessing the context via the client's attribute is deprecated.", - example_message="Instead, pass it to the client's " - "constructor in your `client_fn()` which already " - "receives a context object.", - code_example="def client_fn(context: Context) -> Client:\n\n" - "\t\t# Your existing client_fn\n\n" - "\t\t# Pass `context` to the constructor\n" - "\t\treturn FlowerClient(context).to_client()", - ) - return self._context - - @context.setter - def context(self, context: Context) -> None: - """Setter for `Context` client attribute.""" - self._context = context - - def get_context(self) -> Context: - """Get the run context from this client.""" - return self.context - - def set_context(self, context: Context) -> None: - """Apply a run context to this client.""" - self.context = context - def to_client(self) -> Client: """Return client (itself).""" return self diff --git a/src/py/flwr/client/message_handler/message_handler.py b/src/py/flwr/client/message_handler/message_handler.py index cf4a4b006858..4f03559fa1e4 100644 --- a/src/py/flwr/client/message_handler/message_handler.py +++ b/src/py/flwr/client/message_handler/message_handler.py @@ -105,8 +105,6 @@ def handle_legacy_message_from_msgtype( "Please use `NumPyClient.to_client()` method to convert it to `Client`.", ) - client.set_context(context) - message_type = message.metadata.message_type # Handle GetPropertiesIns diff --git a/src/py/flwr/client/numpy_client.py b/src/py/flwr/client/numpy_client.py index 6a656cb661d2..eb3cac7c9af1 100644 --- a/src/py/flwr/client/numpy_client.py +++ b/src/py/flwr/client/numpy_client.py @@ -21,13 +21,11 @@ from flwr.client.client import Client from flwr.common import ( Config, - Context, NDArrays, Scalar, ndarrays_to_parameters, parameters_to_ndarrays, ) -from flwr.common.logger import warn_deprecated_feature_with_example from flwr.common.typing import ( Code, EvaluateIns, @@ -71,8 +69,6 @@ class NumPyClient(ABC): """Abstract base class for Flower clients using NumPy.""" - _context: Context - def get_properties(self, config: Config) -> dict[str, Scalar]: """Return a client's set of properties. @@ -175,34 +171,6 @@ def evaluate( _ = (self, parameters, config) return 0.0, 0, {} - @property - def context(self) -> Context: - """Getter for `Context` client attribute.""" - warn_deprecated_feature_with_example( - "Accessing the context via the client's attribute is deprecated.", - example_message="Instead, pass it to the client's " - "constructor in your `client_fn()` which already " - "receives a context object.", - code_example="def client_fn(context: Context) -> Client:\n\n" - "\t\t# Your existing client_fn\n\n" - "\t\t# Pass `context` to the constructor\n" - "\t\treturn FlowerClient(context).to_client()", - ) - return self._context - - @context.setter - def context(self, context: Context) -> None: - """Setter for `Context` client attribute.""" - self._context = context - - def get_context(self) -> Context: - """Get the run context from this client.""" - return self.context - - def set_context(self, context: Context) -> None: - """Apply a run context to this client.""" - self.context = context - def to_client(self) -> Client: """Convert to object to Client type and return it.""" return _wrap_numpy_client(client=self) @@ -299,21 +267,9 @@ def _evaluate(self: Client, ins: EvaluateIns) -> EvaluateRes: ) -def _get_context(self: Client) -> Context: - """Return context of underlying NumPyClient.""" - return self.numpy_client.get_context() # type: ignore - - -def _set_context(self: Client, context: Context) -> None: - """Apply context to underlying NumPyClient.""" - self.numpy_client.set_context(context) # type: ignore - - def _wrap_numpy_client(client: NumPyClient) -> Client: member_dict: dict[str, Callable] = { # type: ignore "__init__": _constructor, - "get_context": _get_context, - "set_context": _set_context, } # Add wrapper type methods (if overridden) From bdfcf1fd48c4f94ed8c32aa3fd2b2d9d2c6333ff Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Tue, 17 Dec 2024 14:41:21 +0000 Subject: [PATCH 10/15] docs(framework) Add how-to guide for using JSON output with CLI (#4714) --- .../source/how-to-use-cli-json-output.rst | 194 ++++++++++++++++++ framework/docs/source/index.rst | 1 + 2 files changed, 195 insertions(+) create mode 100644 framework/docs/source/how-to-use-cli-json-output.rst diff --git a/framework/docs/source/how-to-use-cli-json-output.rst b/framework/docs/source/how-to-use-cli-json-output.rst new file mode 100644 index 000000000000..d24940f5068e --- /dev/null +++ b/framework/docs/source/how-to-use-cli-json-output.rst @@ -0,0 +1,194 @@ +Use CLI JSON output +=================== + +The `Flower CLIs `_ come with a built-in JSON output mode. This mode +is useful when you want to consume the output of a Flower CLI programmatically. For +example, you might want to use the output of the ``flwr`` CLI in a script or a +continuous integration pipeline. + +.. note:: + + The JSON output mode is currently only available when using the Flower CLIs with a + `SuperLink `_. Learn more about the `SuperLink` + in the `Flower Architecture Overview `_ page. + +In this guide, we'll show you how to specify a JSON output with the ``flwr run``, ``flwr +ls``, and ``flwr stop`` commands. We will also provide examples of the JSON output for +each of these commands. + +.. |flwr_run| replace:: ``flwr run`` + +.. |flwr_ls| replace:: ``flwr ls`` + +.. |flwr_stop| replace:: ``flwr stop`` + +.. _flwr_ls: ref-api-cli.html#flwr-ls + +.. _flwr_run: ref-api-cli.html#flwr-run + +.. _flwr_stop: ref-api-cli.html#flwr-stop + +``flwr run`` JSON output +------------------------ + +The |flwr_run|_ command runs a Flower app from a provided directory. Note that if the +app path argument is not passed to ``flwr run``, the current working directory is used +as the default Flower app directory. By default, executing the ``flwr run`` command +prints the status of the app build and run process as follows: + +.. code-block:: bash + + $ flwr run + Loading project configuration... + Success + 🎊 Successfully built flwrlabs.myawesomeapp.1-0-0.014c8eb3.fab + 🎊 Successfully started run 1859953118041441032 + +To get the output in JSON format, pass an additional ``--format json`` flag: + +.. code-block:: bash + + $ flwr run --format json + { + "success": true, + "run-id": 1859953118041441032, + "fab-id": "flwrlabs/myawesomeapp", + "fab-name": "myawesomeapp", + "fab-version": "1.0.0", + "fab-hash": "014c8eb3", + "fab-filename": "flwrlabs.myawesomeapp.1-0-0.014c8eb3.fab" + } + +The JSON output for ``flwr run`` contains the following fields: + +- ``success``: A boolean indicating whether the command was successful. +- ``run-id``: The ID of the run. +- ``fab-id``: The ID of the Flower app. +- ``fab-name``: The name of the Flower app. +- ``fab-version``: The version of the Flower app. +- ``fab-hash``: The short hash of the Flower app. +- ``fab-filename``: The filename of the Flower app. + +If the command fails, the JSON output will contain two fields, ``success`` with the +value of ``false`` and ``error-message``. For example, if the command fails to find the +name of the federation on the SuperLink, the output will look like this: + +.. _json_error_output: + +.. code-block:: bash + + $ flwr run --format json + { + "success": false, + "error-message": "Loading project configuration... \nSuccess\n There is no `[missing]` federation declared in the `pyproject.toml`.\n The following federations were found:\n\nfed-existing-1\nfed-existing-2\n\n" + } + +``flwr ls`` JSON output +----------------------- + +The |flwr_ls|_ command lists all the runs in the current project. Similar to ``flwr +run``, if the app path argument is not passed to ``flwr ls``, the current working +directory is used as the Flower app directory. By default, the command list the details +of all runs in a Flower federation in a tabular format: + +.. code-block:: bash + + $ flwr ls + Loading project configuration... + Success + 📄 Listing all runs... + ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ + ┃ Run ID ┃ FAB ┃ Status ┃ Elapsed ┃ Created At ┃ Running At ┃ Finished At ┃ + ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩ + │ 185995311804 │ flwrlabs/my… │ finished:co… │ 00:00:55 │ 2024-12-16 │ 2024-12-16 │ 2024-12-16 │ + │ 1441032 │ (v1.0.0) │ │ │ 11:12:33Z │ 11:12:33Z │ 11:13:29Z │ + ├──────────────┼──────────────┼──────────────┼──────────┼──────────────┼──────────────┼─────────────┤ + │ 142007406570 │ flwrlabs/my… │ running │ 00:00:05 │ 2024-12-16 │ 2024-12-16 │ N/A │ + │ 11601420 │ (v1.0.0) │ │ │ 12:18:39Z │ 12:18:39Z │ │ + └──────────────┴──────────────┴──────────────┴──────────┴──────────────┴──────────────┴─────────────┘ + +To get the output in JSON format, simply pass the ``--format json`` flag: + +.. code-block:: bash + + $ flwr ls --format json + { + "success": true, + "runs": [ + { + "run-id": 1859953118041441032, + "fab-id": "flwrlabs/myawesomeapp1", + "fab-name": "myawesomeapp1", + "fab-version": "1.0.0", + "fab-hash": "014c8eb3", + "status": "finished:completed", + "elapsed": "00:00:55", + "created-at": "2024-12-16 11:12:33Z", + "running-at": "2024-12-16 11:12:33Z", + "finished-at": "2024-12-16 11:13:29Z" + }, + { + "run-id": 14200740657011601420, + "fab-id": "flwrlabs/myawesomeapp2", + "fab-name": "myawesomeapp2", + "fab-version": "1.0.0", + "fab-hash": "014c8eb3", + "status": "running", + "elapsed": "00:00:09", + "created-at": "2024-12-16 12:18:39Z", + "running-at": "2024-12-16 12:18:39Z", + "finished-at": "N/A" + }, + ] + } + +The JSON output for ``flwr ls`` contains similar fields as ``flwr run`` with the +addition of the ``status``, ``elapsed``, ``created-at``, ``running-at``, and +``finished-at`` fields. The ``runs`` key contains a list of dictionaries, each +representing a run. The additional fields are: + +- ``status``: The status of the run, either pending, starting, running, or finished. +- ``elapsed``: The time elapsed since the run started, formatted as ``HH:MM:SS``. +- ``created-at``: The time the run was created. +- ``running-at``: The time the run started running. +- ``finished-at``: The time the run finished. + +All timestamps adhere to ISO 8601, UTC and are formatted as ``YYYY-MM-DD HH:MM:SSZ``. + +You can also use the ``--run-id`` flag to list the details for one run. In this case, +the JSON output will have the same structure as above with only one entry in the +``runs`` key. For more details of this command, see the |flwr_ls|_ documentation. If the +command fails, the JSON output will return two fields, ``success`` and +``error-message``, as shown in :ref:`the above example `. Note that +the content of the error message will be different depending on the error that occurred. + +``flwr stop`` JSON output +------------------------- + +The |flwr_stop|_ command stops a running Flower app for a provided run ID. Similar to +``flwr run``, if the app path argument is not passed to ``flwr stop``, the current +working directory is used as the Flower app directory. By default, the command prints +the status of the stop process as follows: + +.. code-block:: bash + + $ flwr stop 1859953118041441032 + Loading project configuration... + Success + ✋ Stopping run ID 1859953118041441032... + ✅ Run 1859953118041441032 successfully stopped. + +To get the output in JSON format, simply pass the ``--format json`` flag: + +.. code-block:: bash + + $ flwr stop 1859953118041441032 --format json + { + "success": true, + "run-id": 1859953118041441032, + } + +If the command fails, the JSON output will contain two fields ``success`` with the value +of ``false`` and ``error-message``, as shown in :ref:`the above example +`. Note that the content of the error message will be different +depending on the error that occurred. diff --git a/framework/docs/source/index.rst b/framework/docs/source/index.rst index ddb123acd27b..cad980388373 100644 --- a/framework/docs/source/index.rst +++ b/framework/docs/source/index.rst @@ -100,6 +100,7 @@ Problem-oriented how-to guides show step-by-step how to achieve a specific goal. how-to-authenticate-supernodes how-to-implement-fedbn how-to-run-flower-on-azure + how-to-use-cli-json-output docker/index how-to-upgrade-to-flower-1.0 how-to-upgrade-to-flower-1.13 From 470ba4a58537c70932582df0e7e1edb1d6d2b523 Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 17 Dec 2024 14:54:49 +0000 Subject: [PATCH 11/15] feat(framework) Push to the `LinkState` the `Context` of a `ServerApp` run in simulation (#4721) Co-authored-by: Chong Shen Ng --- src/py/flwr/simulation/app.py | 5 ++-- src/py/flwr/simulation/run_simulation.py | 33 +++++++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/py/flwr/simulation/app.py b/src/py/flwr/simulation/app.py index ebcb275b6052..4aab3d75156f 100644 --- a/src/py/flwr/simulation/app.py +++ b/src/py/flwr/simulation/app.py @@ -48,6 +48,7 @@ from flwr.common.serde import ( configs_record_from_proto, context_from_proto, + context_to_proto, fab_from_proto, run_from_proto, run_status_to_proto, @@ -202,7 +203,7 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09 enable_tf_gpu_growth: bool = fed_opt.get("enable_tf_gpu_growth", False) # Launch the simulation - _run_simulation( + updated_context = _run_simulation( server_app_attr=server_app_attr, client_app_attr=client_app_attr, num_supernodes=num_supernodes, @@ -217,7 +218,7 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09 ) # Send resulting context - context_proto = None # context_to_proto(updated_context) + context_proto = context_to_proto(updated_context) out_req = PushSimulationOutputsRequest( run_id=run.run_id, context=context_proto ) diff --git a/src/py/flwr/simulation/run_simulation.py b/src/py/flwr/simulation/run_simulation.py index ae567b6f130e..cc8844405b0b 100644 --- a/src/py/flwr/simulation/run_simulation.py +++ b/src/py/flwr/simulation/run_simulation.py @@ -24,6 +24,7 @@ import traceback from logging import DEBUG, ERROR, INFO, WARNING from pathlib import Path +from queue import Empty, Queue from typing import Any, Optional from flwr.cli.config_utils import load_and_validate @@ -126,7 +127,7 @@ def run_simulation_from_cli() -> None: run = Run.create_empty(run_id) run.override_config = override_config - _run_simulation( + _ = _run_simulation( server_app_attr=server_app_attr, client_app_attr=client_app_attr, num_supernodes=args.num_supernodes, @@ -206,7 +207,7 @@ def run_simulation( "\n\tflwr.simulation.run_simulationt(...)", ) - _run_simulation( + _ = _run_simulation( num_supernodes=num_supernodes, client_app=client_app, server_app=server_app, @@ -229,6 +230,7 @@ def run_serverapp_th( has_exception: threading.Event, enable_tf_gpu_growth: bool, run_id: int, + ctx_queue: "Queue[Context]", ) -> threading.Thread: """Run SeverApp in a thread.""" @@ -241,6 +243,7 @@ def server_th_with_start_checks( _server_app_run_config: UserConfig, _server_app_attr: Optional[str], _server_app: Optional[ServerApp], + _ctx_queue: "Queue[Context]", ) -> None: """Run SeverApp, after check if GPU memory growth has to be set. @@ -261,13 +264,14 @@ def server_th_with_start_checks( ) # Run ServerApp - _run( + updated_context = _run( driver=_driver, context=context, server_app_dir=_server_app_dir, server_app_attr=_server_app_attr, loaded_server_app=_server_app, ) + _ctx_queue.put(updated_context) except Exception as ex: # pylint: disable=broad-exception-caught log(ERROR, "ServerApp thread raised an exception: %s", ex) log(ERROR, traceback.format_exc()) @@ -291,6 +295,7 @@ def server_th_with_start_checks( server_app_run_config, server_app_attr, server_app, + ctx_queue, ), ) serverapp_th.start() @@ -313,7 +318,7 @@ def _main_loop( server_app: Optional[ServerApp] = None, server_app_attr: Optional[str] = None, server_app_run_config: Optional[UserConfig] = None, -) -> None: +) -> Context: """Start ServerApp on a separate thread, then launch Simulation Engine.""" # Initialize StateFactory state_factory = LinkStateFactory(":flwr-in-memory-state:") @@ -323,6 +328,13 @@ def _main_loop( server_app_thread_has_exception = threading.Event() serverapp_th = None success = True + updated_context = Context( + run_id=run.run_id, + node_id=0, + node_config=UserConfig(), + state=RecordSet(), + run_config=UserConfig(), + ) try: # Register run log(DEBUG, "Pre-registering run with id %s", run.run_id) @@ -337,6 +349,7 @@ def _main_loop( # Initialize Driver driver = InMemoryDriver(state_factory=state_factory) driver.set_run(run_id=run.run_id) + output_context_queue: "Queue[Context]" = Queue() # Get and run ServerApp thread serverapp_th = run_serverapp_th( @@ -349,6 +362,7 @@ def _main_loop( has_exception=server_app_thread_has_exception, enable_tf_gpu_growth=enable_tf_gpu_growth, run_id=run.run_id, + ctx_queue=output_context_queue, ) # Start Simulation Engine @@ -366,6 +380,11 @@ def _main_loop( flwr_dir=flwr_dir, ) + updated_context = output_context_queue.get(timeout=3) + + except Empty: + log(DEBUG, "Queue timeout. No context received.") + except Exception as ex: log(ERROR, "An exception occurred !! %s", ex) log(ERROR, traceback.format_exc()) @@ -382,6 +401,7 @@ def _main_loop( raise RuntimeError("Exception in ServerApp thread") log(DEBUG, "Stopping Simulation Engine now.") + return updated_context # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments @@ -401,7 +421,7 @@ def _run_simulation( enable_tf_gpu_growth: bool = False, verbose_logging: bool = False, is_app: bool = False, -) -> None: +) -> Context: """Launch the Simulation Engine.""" if backend_config is None: backend_config = {} @@ -480,7 +500,8 @@ def _run_simulation( # Set logger propagation to False to prevent duplicated log output in Colab. logger = set_logger_propagation(logger, False) - _main_loop(*args) + updated_context = _main_loop(*args) + return updated_context def _parse_args_run_simulation() -> argparse.ArgumentParser: From e9c36531f5a9a21a1cfb0be82dda53a0890ee60f Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 17 Dec 2024 15:11:34 +0000 Subject: [PATCH 12/15] refactor(framework) Introduce run status checks in `SimulationIo` (#4729) --- .../simulation/simulation_servicer_test.py | 209 ++++++++++++++++++ .../simulation/simulationio_servicer.py | 13 ++ 2 files changed, 222 insertions(+) create mode 100644 src/py/flwr/server/superlink/simulation/simulation_servicer_test.py diff --git a/src/py/flwr/server/superlink/simulation/simulation_servicer_test.py b/src/py/flwr/server/superlink/simulation/simulation_servicer_test.py new file mode 100644 index 000000000000..fa0302458fb2 --- /dev/null +++ b/src/py/flwr/server/superlink/simulation/simulation_servicer_test.py @@ -0,0 +1,209 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""SimulationIoServicer tests.""" + + +import tempfile +import unittest + +import grpc +from parameterized import parameterized + +from flwr.common import ConfigsRecord, Context +from flwr.common.constant import SIMULATIONIO_API_DEFAULT_SERVER_ADDRESS, Status +from flwr.common.serde import context_to_proto, run_status_to_proto +from flwr.common.serde_test import RecordMaker +from flwr.common.typing import RunStatus +from flwr.proto.run_pb2 import ( # pylint: disable=E0611 + UpdateRunStatusRequest, + UpdateRunStatusResponse, +) +from flwr.proto.simulationio_pb2 import ( # pylint: disable=E0611 + PushSimulationOutputsRequest, + PushSimulationOutputsResponse, +) +from flwr.server.superlink.ffs.ffs_factory import FfsFactory +from flwr.server.superlink.linkstate.linkstate_factory import LinkStateFactory +from flwr.server.superlink.simulation.simulationio_grpc import run_simulationio_api_grpc +from flwr.server.superlink.utils import _STATUS_TO_MSG + + +class TestSimulationIoServicer(unittest.TestCase): # pylint: disable=R0902 + """SimulationIoServicer tests for allowed RunStatuses.""" + + def setUp(self) -> None: + """Initialize mock stub and server interceptor.""" + # Create a temporary directory + self.temp_dir = tempfile.TemporaryDirectory() # pylint: disable=R1732 + self.addCleanup(self.temp_dir.cleanup) # Ensures cleanup after test + + state_factory = LinkStateFactory(":flwr-in-memory-state:") + self.state = state_factory.state() + ffs_factory = FfsFactory(self.temp_dir.name) + self.ffs = ffs_factory.ffs() + + self.status_to_msg = _STATUS_TO_MSG + + self._server: grpc.Server = run_simulationio_api_grpc( + SIMULATIONIO_API_DEFAULT_SERVER_ADDRESS, + state_factory, + ffs_factory, + None, + ) + + self._channel = grpc.insecure_channel("localhost:9096") + self._push_simulation_outputs = self._channel.unary_unary( + "/flwr.proto.SimulationIo/PushSimulationOutputs", + request_serializer=PushSimulationOutputsRequest.SerializeToString, + response_deserializer=PushSimulationOutputsResponse.FromString, + ) + self._update_run_status = self._channel.unary_unary( + "/flwr.proto.SimulationIo/UpdateRunStatus", + request_serializer=UpdateRunStatusRequest.SerializeToString, + response_deserializer=UpdateRunStatusResponse.FromString, + ) + + def tearDown(self) -> None: + """Clean up grpc server.""" + self._server.stop(None) + + def _transition_run_status(self, run_id: int, num_transitions: int) -> None: + if num_transitions > 0: + _ = self.state.update_run_status(run_id, RunStatus(Status.STARTING, "", "")) + if num_transitions > 1: + _ = self.state.update_run_status(run_id, RunStatus(Status.RUNNING, "", "")) + if num_transitions > 2: + _ = self.state.update_run_status(run_id, RunStatus(Status.FINISHED, "", "")) + + def test_push_simulation_outputs_successful_if_running(self) -> None: + """Test `PushSimulationOutputs` success.""" + # Prepare + run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) + + maker = RecordMaker() + context = Context( + run_id=run_id, + node_id=0, + node_config=maker.user_config(), + state=maker.recordset(1, 1, 1), + run_config=maker.user_config(), + ) + + # Transition status to running. PushTaskRes is only allowed in running status. + self._transition_run_status(run_id, 2) + request = PushSimulationOutputsRequest( + run_id=run_id, context=context_to_proto(context) + ) + + # Execute + response, call = self._push_simulation_outputs.with_call(request=request) + + # Assert + assert isinstance(response, PushSimulationOutputsResponse) + assert grpc.StatusCode.OK == call.code() + + def _assert_push_simulation_outputs_not_allowed( + self, run_id: int, context: Context + ) -> None: + """Assert `PushSimulationOutputs` not allowed.""" + run_status = self.state.get_run_status({run_id})[run_id] + request = PushSimulationOutputsRequest( + run_id=run_id, context=context_to_proto(context) + ) + + with self.assertRaises(grpc.RpcError) as e: + self._push_simulation_outputs.with_call(request=request) + assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED + assert e.exception.details() == self.status_to_msg[run_status.status] + + @parameterized.expand( + [ + (0,), # Test not successful if RunStatus is pending. + (1,), # Test not successful if RunStatus is starting. + (3,), # Test not successful if RunStatus is finished. + ] + ) # type: ignore + def test_push_simulation_outputs_not_successful_if_not_running( + self, num_transitions: int + ) -> None: + """Test `PushSimulationOutputs` not successful if RunStatus is not running.""" + # Prepare + run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) + + maker = RecordMaker() + context = Context( + run_id=run_id, + node_id=0, + node_config=maker.user_config(), + state=maker.recordset(1, 1, 1), + run_config=maker.user_config(), + ) + + self._transition_run_status(run_id, num_transitions) + + # Execute & Assert + self._assert_push_simulation_outputs_not_allowed(run_id, context) + + @parameterized.expand( + [ + (0,), # Test successful if RunStatus is pending. + (1,), # Test successful if RunStatus is starting. + (2,), # Test successful if RunStatus is running. + ] + ) # type: ignore + def test_update_run_status_successful_if_not_finished( + self, num_transitions: int + ) -> None: + """Test `UpdateRunStatus` success.""" + # Prepare + run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) + _ = self.state.get_run_status({run_id})[run_id] + next_run_status = RunStatus(Status.STARTING, "", "") + + if num_transitions > 0: + _ = self.state.update_run_status(run_id, RunStatus(Status.STARTING, "", "")) + next_run_status = RunStatus(Status.RUNNING, "", "") + if num_transitions > 1: + _ = self.state.update_run_status(run_id, RunStatus(Status.RUNNING, "", "")) + next_run_status = RunStatus(Status.FINISHED, "", "") + + request = UpdateRunStatusRequest( + run_id=run_id, run_status=run_status_to_proto(next_run_status) + ) + + # Execute + response, call = self._update_run_status.with_call(request=request) + + # Assert + assert isinstance(response, UpdateRunStatusResponse) + assert grpc.StatusCode.OK == call.code() + + def test_update_run_status_not_successful_if_finished(self) -> None: + """Test `UpdateRunStatus` not successful.""" + # Prepare + run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) + _ = self.state.get_run_status({run_id})[run_id] + _ = self.state.update_run_status(run_id, RunStatus(Status.FINISHED, "", "")) + run_status = self.state.get_run_status({run_id})[run_id] + next_run_status = RunStatus(Status.FINISHED, "", "") + + request = UpdateRunStatusRequest( + run_id=run_id, run_status=run_status_to_proto(next_run_status) + ) + + with self.assertRaises(grpc.RpcError) as e: + self._update_run_status.with_call(request=request) + assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED + assert e.exception.details() == self.status_to_msg[run_status.status] diff --git a/src/py/flwr/server/superlink/simulation/simulationio_servicer.py b/src/py/flwr/server/superlink/simulation/simulationio_servicer.py index 9104d9cde875..bfaec9953ec5 100644 --- a/src/py/flwr/server/superlink/simulation/simulationio_servicer.py +++ b/src/py/flwr/server/superlink/simulation/simulationio_servicer.py @@ -54,6 +54,7 @@ ) from flwr.server.superlink.ffs.ffs_factory import FfsFactory from flwr.server.superlink.linkstate import LinkStateFactory +from flwr.server.superlink.utils import abort_if class SimulationIoServicer(simulationio_pb2_grpc.SimulationIoServicer): @@ -110,6 +111,15 @@ def PushSimulationOutputs( """Push Simulation process outputs.""" log(DEBUG, "SimultionIoServicer.PushSimulationOutputs") state = self.state_factory.state() + + # Abort if the run is not running + abort_if( + request.run_id, + [Status.PENDING, Status.STARTING, Status.FINISHED], + state, + context, + ) + state.set_serverapp_context(request.run_id, context_from_proto(request.context)) return PushSimulationOutputsResponse() @@ -120,6 +130,9 @@ def UpdateRunStatus( log(DEBUG, "SimultionIoServicer.UpdateRunStatus") state = self.state_factory.state() + # Abort if the run is finished + abort_if(request.run_id, [Status.FINISHED], state, context) + # Update the run status state.update_run_status( run_id=request.run_id, new_status=run_status_from_proto(request.run_status) From 69988749eee91787cae4f3269353f2b396af57d1 Mon Sep 17 00:00:00 2001 From: Heng Pan Date: Wed, 18 Dec 2024 00:21:51 +0900 Subject: [PATCH 13/15] feat(framework) Add prompt for `flwr login` when authentication fails (#4719) --- src/py/flwr/cli/log.py | 14 ++++++++------ src/py/flwr/cli/ls.py | 8 +++++--- src/py/flwr/cli/run/run.py | 9 +++++++-- src/py/flwr/cli/stop.py | 5 +++-- src/py/flwr/cli/utils.py | 23 +++++++++++++++++++++++ 5 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/py/flwr/cli/log.py b/src/py/flwr/cli/log.py index 93fe3c387974..ef9fa2da2e65 100644 --- a/src/py/flwr/cli/log.py +++ b/src/py/flwr/cli/log.py @@ -34,7 +34,7 @@ from flwr.proto.exec_pb2 import StreamLogsRequest # pylint: disable=E0611 from flwr.proto.exec_pb2_grpc import ExecStub -from .utils import init_channel, try_obtain_cli_auth_plugin +from .utils import init_channel, try_obtain_cli_auth_plugin, unauthenticated_exc_handler def start_stream( @@ -88,8 +88,9 @@ def stream_logs( latest_timestamp = 0.0 res = None try: - for res in stub.StreamLogs(req, timeout=duration): - print(res.log_output, end="") + with unauthenticated_exc_handler(): + for res in stub.StreamLogs(req, timeout=duration): + print(res.log_output, end="") except grpc.RpcError as e: # pylint: disable=E1101 if e.code() != grpc.StatusCode.DEADLINE_EXCEEDED: @@ -109,9 +110,10 @@ def print_logs(run_id: int, channel: grpc.Channel, timeout: int) -> None: try: while True: try: - # Enforce timeout for graceful exit - for res in stub.StreamLogs(req, timeout=timeout): - print(res.log_output) + with unauthenticated_exc_handler(): + # Enforce timeout for graceful exit + for res in stub.StreamLogs(req, timeout=timeout): + print(res.log_output) except grpc.RpcError as e: # pylint: disable=E1101 if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: diff --git a/src/py/flwr/cli/ls.py b/src/py/flwr/cli/ls.py index e845583d61ac..9c30a158952f 100644 --- a/src/py/flwr/cli/ls.py +++ b/src/py/flwr/cli/ls.py @@ -43,7 +43,7 @@ ) from flwr.proto.exec_pb2_grpc import ExecStub -from .utils import init_channel, try_obtain_cli_auth_plugin +from .utils import init_channel, try_obtain_cli_auth_plugin, unauthenticated_exc_handler _RunListType = tuple[int, str, str, str, str, str, str, str, str] @@ -295,7 +295,8 @@ def _list_runs( output_format: str = CliOutputFormat.DEFAULT, ) -> None: """List all runs.""" - res: ListRunsResponse = stub.ListRuns(ListRunsRequest()) + with unauthenticated_exc_handler(): + res: ListRunsResponse = stub.ListRuns(ListRunsRequest()) run_dict = {run_id: run_from_proto(proto) for run_id, proto in res.run_dict.items()} formatted_runs = _format_runs(run_dict, res.now) @@ -311,7 +312,8 @@ def _display_one_run( output_format: str = CliOutputFormat.DEFAULT, ) -> None: """Display information about a specific run.""" - res: ListRunsResponse = stub.ListRuns(ListRunsRequest(run_id=run_id)) + with unauthenticated_exc_handler(): + res: ListRunsResponse = stub.ListRuns(ListRunsRequest(run_id=run_id)) if not res.run_dict: raise ValueError(f"Run ID {run_id} not found") diff --git a/src/py/flwr/cli/run/run.py b/src/py/flwr/cli/run/run.py index 19db2ade39dc..9436f5879c80 100644 --- a/src/py/flwr/cli/run/run.py +++ b/src/py/flwr/cli/run/run.py @@ -48,7 +48,11 @@ from flwr.proto.exec_pb2_grpc import ExecStub from ..log import start_stream -from ..utils import init_channel, try_obtain_cli_auth_plugin +from ..utils import ( + init_channel, + try_obtain_cli_auth_plugin, + unauthenticated_exc_handler, +) CONN_REFRESH_PERIOD = 60 # Connection refresh period for log streaming (seconds) @@ -166,7 +170,8 @@ def _run_with_exec_api( override_config=user_config_to_proto(parse_config_args(config_overrides)), federation_options=configs_record_to_proto(c_record), ) - res = stub.StartRun(req) + with unauthenticated_exc_handler(): + res = stub.StartRun(req) if res.HasField("run_id"): typer.secho(f"🎊 Successfully started run {res.run_id}", fg=typer.colors.GREEN) diff --git a/src/py/flwr/cli/stop.py b/src/py/flwr/cli/stop.py index 4989d216f28f..2a96e337bf80 100644 --- a/src/py/flwr/cli/stop.py +++ b/src/py/flwr/cli/stop.py @@ -34,7 +34,7 @@ from flwr.proto.exec_pb2 import StopRunRequest, StopRunResponse # pylint: disable=E0611 from flwr.proto.exec_pb2_grpc import ExecStub -from .utils import init_channel, try_obtain_cli_auth_plugin +from .utils import init_channel, try_obtain_cli_auth_plugin, unauthenticated_exc_handler def stop( # pylint: disable=R0914 @@ -113,7 +113,8 @@ def stop( # pylint: disable=R0914 def _stop_run(stub: ExecStub, run_id: int, output_format: str) -> None: """Stop a run.""" - response: StopRunResponse = stub.StopRun(request=StopRunRequest(run_id=run_id)) + with unauthenticated_exc_handler(): + response: StopRunResponse = stub.StopRun(request=StopRunRequest(run_id=run_id)) if response.success: typer.secho(f"✅ Run {run_id} successfully stopped.", fg=typer.colors.GREEN) if output_format == CliOutputFormat.JSON: diff --git a/src/py/flwr/cli/utils.py b/src/py/flwr/cli/utils.py index 7a8040756c5e..8cb89255ed40 100644 --- a/src/py/flwr/cli/utils.py +++ b/src/py/flwr/cli/utils.py @@ -18,6 +18,8 @@ import hashlib import json import re +from collections.abc import Iterator +from contextlib import contextmanager from logging import DEBUG from pathlib import Path from typing import Any, Callable, Optional, cast @@ -231,3 +233,24 @@ def on_channel_state_change(channel_connectivity: str) -> None: ) channel.subscribe(on_channel_state_change) return channel + + +@contextmanager +def unauthenticated_exc_handler() -> Iterator[None]: + """Context manager to handle gRPC UNAUTHENTICATED errors. + + It catches grpc.RpcError exceptions with UNAUTHENTICATED status, informs the user, + and exits the application. All other exceptions will be allowed to escape. + """ + try: + yield + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.UNAUTHENTICATED: + raise + typer.secho( + "❌ Authentication failed. Please run `flwr login`" + " to authenticate and try again.", + fg=typer.colors.RED, + bold=True, + ) + raise typer.Exit(code=1) from None From 7f3acafee9574d16e2962715093ff677774e4327 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Tue, 17 Dec 2024 15:32:42 +0000 Subject: [PATCH 14/15] refactor(framework) Add request validation to ServerAppIoServicer (#4720) --- .../flwr/server/superlink/driver/serverappio_servicer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/py/flwr/server/superlink/driver/serverappio_servicer.py b/src/py/flwr/server/superlink/driver/serverappio_servicer.py index f52129a2ba11..f4183763cae5 100644 --- a/src/py/flwr/server/superlink/driver/serverappio_servicer.py +++ b/src/py/flwr/server/superlink/driver/serverappio_servicer.py @@ -159,6 +159,9 @@ def PushTaskIns( for task_ins in request.task_ins_list: validation_errors = validate_task_ins_or_res(task_ins) _raise_if(bool(validation_errors), ", ".join(validation_errors)) + _raise_if( + request.run_id != task_ins.run_id, "`task_ins` has mismatched `run_id`" + ) # Store each TaskIns task_ids: list[Optional[UUID]] = [] @@ -193,6 +196,12 @@ def PullTaskRes( # Read from state task_res_list: list[TaskRes] = state.get_task_res(task_ids=task_ids) + # Validate request + for task_res in task_res_list: + _raise_if( + request.run_id != task_res.run_id, "`task_res` has mismatched `run_id`" + ) + # Delete the TaskIns/TaskRes pairs if TaskRes is found task_ins_ids_to_delete = { UUID(task_res.task.ancestry[0]) for task_res in task_res_list From 0c838d7bf8078a85572efa82b57ddace847d20b6 Mon Sep 17 00:00:00 2001 From: Adam Narozniak <51029327+adam-narozniak@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:16:14 +0100 Subject: [PATCH 15/15] fix(datasets) Update outdated docs references (#4732) Co-authored-by: Taner Topal --- datasets/README.md | 2 +- datasets/dev/format.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datasets/README.md b/datasets/README.md index 0d35d2e31b6a..f3091ab7532b 100644 --- a/datasets/README.md +++ b/datasets/README.md @@ -51,7 +51,7 @@ Create **custom partitioning schemes** or choose from the **implemented [partiti * Exponential partitioning `ExponentialPartitioner(num_partitions)` * more to come in the future releases (contributions are welcome).

- Comparison of partitioning schemes. + Comparison of partitioning schemes.
Comparison of Partitioning Schemes on CIFAR10

diff --git a/datasets/dev/format.sh b/datasets/dev/format.sh index b7dca9accabf..94c70444d735 100755 --- a/datasets/dev/format.sh +++ b/datasets/dev/format.sh @@ -28,7 +28,7 @@ echo "Formatting done: Python" # Notebooks echo "Formatting started: Notebooks" -python -m black --ipynb -q doc/source/*.ipynb +python -m black --ipynb -q docs/source/*.ipynb KEYS="metadata.celltoolbar metadata.language_info metadata.toc metadata.notify_time metadata.varInspector metadata.accelerator metadata.vscode cell.metadata.id cell.metadata.heading_collapsed cell.metadata.hidden cell.metadata.code_folding cell.metadata.tags cell.metadata.init_cell cell.metadata.vscode cell.metadata.pycharm" -python -m nbstripout --keep-output doc/source/*.ipynb --extra-keys "$KEYS" +python -m nbstripout --keep-output docs/source/*.ipynb --extra-keys "$KEYS" echo "Formatting done: Notebooks"