diff --git a/docs/lifl/lifl.md b/docs/lifl/lifl.md index cefc6a42e..2e7264242 100644 --- a/docs/lifl/lifl.md +++ b/docs/lifl/lifl.md @@ -1,6 +1,6 @@ # LIFL Instructions -This document provides instructions on how to use LIFL in flame. +This document provides instructions on how to use LIFL in flame. In case you are installing on a mounted drive refer to the following documentation: [LIFL on expternal drive](./lifl_ext.md) ## Prerequisites The target runtime environment of LIFL is Linux **only**. LIFL requires Linux kernel version >= 5.15. We have tested LIFL on Ubuntu 20. @@ -21,11 +21,50 @@ cd third_party/spright_utility/scripts ```bash # Install deps for libbpf sudo apt update && sudo apt install -y flex bison build-essential dwarves libssl-dev \ - libelf-dev pkg-config libconfig-dev clang gcc-multilib + libelf-dev pkg-config libconfig-dev clang gcc-multilib byobu htop # Execute the libbpf installation script cd third_party/spright_utility/scripts ./libbpf.sh +``` +Finally, install sockmap_manager files + +```bash +cd flame/third_party/spright_utility/ +make all +``` + +### 3. installing Go and GolangCI-Lint + +```bash +# Download the Go tarball and extract it to the directory +golang_file=go1.22.3.linux-amd64.tar.gz +curl -LO https://go.dev/dl/$golang_file && tar -C -xzf $golang_file +# Download and run the GolangCI-Lint installation script +curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b /go/bin v1.49.0 +golangci-lint --version # Verify that GolangCI-Lint was installed +``` + +### Environment Setup + +We recommend setting up your environment with `conda`. Within the cloned flame directory, run the following to activate and setup the flame environment: + +```bash +cd flame +make install # Install flame control plane utilities +# Run within the cloned flame directory +cd lib/python/flame +conda create -n flame python=3.9 +conda activate flame + +pip install google +pip install tensorflow +pip install torch +pip install torchvision + +cd .. +make install + ``` ## Shared Memory Backend in LIFL @@ -33,6 +72,10 @@ cd third_party/spright_utility/scripts The [shared memory backend](../../lib/python/flame/backend/shm.py) in LIFL uses eBPF's sockmap and SK_MSG to pass buffer references between aggregators. We introduce a "[sockmap_manager](../../third_party/spright_utility/src/sockmap_manager.c)" on each node to manage the registration of aggregator's socket to the in-kernel sockmap. You must run the `sockmap_manager` first. ```bash +# Execute the metaserver +cd .flame/bin/ # from root dir +sudo ./metaserver + # Execute the sockmap_manager cd third_party/spright_utility/ @@ -85,13 +128,76 @@ Flame initially supports hierarchical aggregation with two levels: top level and Flame initially supports lazy aggregation only. LIFL adds additional support for having eager aggregation in Flame, which gives us more flexible timing on the aggregation process. The example to run eager aggregation is availble at [eager_hier_mnist](../../lib/python/examples/eager_hier_mnist/). The implementation of eager aggregation is available at [eager_syncfl](../../lib/python/flame/mode/horizontal/eager_syncfl/). -## Problems when running LIFL -1. When you run `sudo ./bin/sockmap_manager`, you receive +## Running an Example + +We will run the [coord_hier_syncfl_mnist](../../lib/python/examples/coord_hier_syncfl_mnist/) example + +Open six terminal windows. + +In the first terminal run: + +```bash +cd .flame/bin/ # from root dir + +sudo ./metaserver +``` + +Open another terminal for the sockmap_manager + +```bash +cd flame/third_party/spright_utility/ + +sudo ./bin/sockmap_manager +``` + +We would use the rest of the terminals for the actual aggregation and training. + +Run the cordinator + +```bash +cd flame/lib/python/examples/coord_hier_syncfl_mnist/coordinator + +conda activate flame +python main.py config.json +``` + +Run the Top Aggregator + +```bash +cd flame/lib/python/examples/coord_hier_syncfl_mnist/top_aggregator + +conda activate flame +python main.py configs/config_shm.json +``` + +Run the Middle Aggregator + +```bash +cd flame/lib/python/examples/coord_hier_syncfl_mnist/middle_aggregator + +conda activate flame +python main.py configs/config_shm_1.json +``` + +Run the Trainer + +```bash +cd flame/lib/python/examples/coord_hier_syncfl_mnist/trainer + +conda activate flame +python main.py config1.json ``` + +## Problems when running LIFL + +### 1. When you run `sudo ./bin/sockmap_manager`, you receive + +```text ./bin/sockmap_manager: error while loading shared libraries: libbpf.so.0: cannot open shared object file: No such file or directory ``` Solutions: This may happen when you use Ubuntu 22, which has the libbpf 0.5.0 pre-installed. You need to re-link the `/lib/x86_64-linux-gnu/libbpf.so.0` to `libbpf.so.0.6.0` + ```bash # Assume you have executed the libbpf installation script cd third_party/spright_utility/scripts/libbpf/src @@ -101,4 +207,21 @@ sudo cp libbpf.so.0.6.0 /lib/x86_64-linux-gnu/ # Re-link libbpf.so.0 sudo ln -sf /lib/x86_64-linux-gnu/libbpf.so.0.6.0 /lib/x86_64-linux-gnu/libbpf.so.0 -``` \ No newline at end of file +``` + +### 2. When runining the trainer, you receive + +```text + File "miniconda3/envs/flame/lib/python3.9/site-packages/pip/_vendor/typing_extensions.py", line 3019, in _collect_type_vars + raise TypeError(f'Type parameter {t!r} without a default' +TypeError: Type parameter +T without a default follows type parameter with a default +``` + +Solution: You should be able to solve this error by downgrading pip + +```bash +pip install pip==24.1.2 + +conda activate flame +python main.py config1.json +``` diff --git a/docs/lifl/lifl_ext.md b/docs/lifl/lifl_ext.md new file mode 100644 index 000000000..b9eafaad6 --- /dev/null +++ b/docs/lifl/lifl_ext.md @@ -0,0 +1,161 @@ +# Running LIFL on a Mounted Drive for Extra Storage + +This guide will walk you through configuring and running LIFL on a mounted drive, such as /mydata, for extra storage. You can name the mounted drive as needed, just replace /mydata with your desired mount point. + +## Prerequisites + +Before starting, ensure you have: + +- A Linux-based system +- A mounted drive (e.g., /mydata) +- Sufficient storage space for datasets, dependencies, and binaries. + +> [!NOTE] +> In this example the mounted drive is named `/mydata` + +### 1. Prepare the Mounted Drive + +First, update your system and install essential utilities. + +```bash +sudo apt update && sudo apt install -y byobu htop +``` + +Change ownership of the mounted drive to the current user: + +```bash +sudo chown -R $(id -u):$(id -g) /mydata +cd /mydata +export MYMOUNT=/mydata +``` + +### 2. Install Golang on the Mounted Drive + +Download and extract the latest version of Golang: + +```bash +golang_file=go1.22.3.linux-amd64.tar.gz +curl -LO https://go.dev/dl/$golang_file && tar -C /mydata -xzf $golang_file +``` + +Update the system's PATH: + +```bash +echo "PATH=\"/mydata/go/bin:\$PATH\"" >> $HOME/.bashrc +echo "PATH=\"\$HOME/.flame/bin:\$PATH\"" >> $HOME/.bashrc +source $HOME/.bashrc +``` + +### 3. Install GolangCI-Lint + +Install GolangCI-Lint to ensure code quality for Go projects: + +```bash +curl -sSfL | sh -s -- -b /mydata/go/bin v1.49.0 +golangci-lint --version +``` + +### 4. Install Miniconda on the Mounted Drive + +Download and install Miniconda on the mounted drive: + +```bash +wget +chmod +x Miniconda3-py39_23.3.1-0-Linux-x86_64.sh +bash Miniconda3-py39_23.3.1-0-Linux-x86_64.sh -b -p /mydata/miniconda3 +source /mydata/miniconda3/bin/activate +conda init bash +source $HOME/.bashrc +``` + +### 5. Set up the Python Environment + +Create a Python environment for LIFL: + +```bash +conda create -n flame python=3.9 -y +conda activate flame +``` + +Install necessary Python libraries: + +```bash +pip install google tensorflow torch torchvision +``` + +### 6. Clone and Setup the Flame Repository + +Clone the Flame repository: + +```bash +git clone +``` + +### 7. Upgrade Kernel (if needed) + +Note: If your kernel version is >= 5.15, you can skip this step. To check use the following command `uname -r` + +Navigate to the spright_utility scripts directory and execute the kernel upgrade script: + +```bash +cd flame/third_party/spright_utility/scripts +./upgrade_kernel.sh +``` + +### 8. Install libbpf for LIFL + +Install the dependencies for libbpf: + +```bash +sudo apt update && sudo apt install -y flex bison build-essential dwarves \ + libssl-dev libelf-dev pkg-config libconfig-dev clang gcc-multilib byobu htop +``` + +Then, install libbpf: + +```bash +cd flame/third_party/spright_utility/scripts +./libbpf.sh +``` + +Finally, install sockmap_manager files + +```bash +cd flame/third_party/spright_utility/ +make all +``` + +### 9. Set up a Local MQTT Broker + +LIFL uses an MQTT broker for communication during federated learning. To install and set up a local MQTT broker: + +```bash +sudo apt update +sudo apt install -y mosquitto +sudo systemctl status mosquitto +``` + +You should see something like: + +```bash +mosquitto.service - Mosquitto MQTT v3.1/v3.1.1 Broker + Active: active (running) +``` + +To manage the Mosquitto service: + +```bash +# Start Mosquitto + +sudo systemctl start mosquitto + +# Stop Mosquitto + +sudo systemctl stop mosquitto + +# Restart Mosquitto + +sudo systemctl restart mosquitto +``` + +Tutorial on how to run an example can be found in [lifl example](./lifl.md#running-an-example) diff --git a/lib/python/flame/backend/shm.py b/lib/python/flame/backend/shm.py index 29d5512bb..ed03682fe 100644 --- a/lib/python/flame/backend/shm.py +++ b/lib/python/flame/backend/shm.py @@ -185,32 +185,98 @@ async def _create_join_inner_task(): _, success = run_async(_create_join_inner_task(), self._loop) if not success: raise SystemError("_create_join_inner_task failure") - + def leave(self, channel) -> None: - """Leave a given channel. - - TODO: notify the sockmap manager to remove the entry from eBPF map - """ logger.info("Clean up shared memory buffers.") + # 1. Clean up per-end buffers for end in channel.all_ends(): - shm_buf = shared_memory.SharedMemory(name = end) - shm_buf.close() - if end == self._id: - shm_buf.unlink() + if shared_memory_exists(end): + try: + shm_buf = shared_memory.SharedMemory(name=end) + shm_buf.close() + if end == self._id: + try: + shm_buf.unlink() + unregister(end, "shared_memory") + except FileNotFoundError: + logger.debug(f"Shared memory {end} already unlinked by another process.") + except FileNotFoundError: + logger.debug(f"Shared memory segment {end} not found during cleanup.") + else: + logger.debug(f"Shared memory {end} does not exist, skipping.") + + # 2. Clean up our role-specific dictionary + my_shm_name = channel.name() + "-" + channel.my_role() + if shared_memory_exists(my_shm_name): + try: + my_shm_ends = SharedMemoryDict(name=my_shm_name, size=SHM_DICT_SIZE) + if self._id in my_shm_ends: + del my_shm_ends[self._id] + + if len(my_shm_ends) == 0: + my_shm_ends.shm.close() + try: + my_shm_ends.shm.unlink() + unregister(my_shm_name, "shared_memory") + except FileNotFoundError: + logger.debug(f"Shared memory dict {my_shm_name} already unlinked.") + else: + my_shm_ends.shm.close() + except FileNotFoundError: + logger.debug(f"No shared memory dict found for {my_shm_name}") + else: + logger.debug(f"{my_shm_name} does not exist, skipping.") + + # 3. Clean up all segments created in set_data() + # These have names like self._id + "-" + other + # Ensure we only unlink if this process created them. + for key in list(self._is_shm_buf_created.keys()): + # key is of the form self._id + "-" + other + if shared_memory_exists(key): + try: + shm_buf = shared_memory.SharedMemory(name=key) + shm_buf.close() + # As the creator (since we track them in _is_shm_buf_created), + # we can safely unlink here + try: + shm_buf.unlink() + unregister(key, "shared_memory") + except FileNotFoundError: + logger.debug(f"Shared memory {key} already unlinked by another process.") + except FileNotFoundError: + logger.debug(f"Shared memory segment {key} not found during cleanup.") + else: + logger.debug(f"Shared memory {key} does not exist, skipping.") + self._is_shm_buf_created.clear() + + logger.debug("channel leave completed gracefully") + + # def leave(self, channel) -> None: + # """Leave a given channel. + + # TODO: notify the sockmap manager to remove the entry from eBPF map + # """ + # logger.info("Clean up shared memory buffers.") - # NOTE: this method may recreate the shm dict. - shm_ends = SharedMemoryDict(name = channel.name() + "-" + channel.my_role(), size = SHM_DICT_SIZE) - del shm_ends[self._id] + # for end in channel.all_ends(): + # shm_buf = shared_memory.SharedMemory(name = end) + # shm_buf.close() + # if end == self._id: + # shm_buf.unlink() + + # # NOTE: this method may recreate the shm dict. + # shm_ends = SharedMemoryDict(name = channel.name() + "-" + channel.my_role(), size = SHM_DICT_SIZE) + # del shm_ends[self._id] - if len(shm_ends) == 0: - shm_ends.shm.close() - shm_ends.shm.unlink() - del shm_ends + # if len(shm_ends) == 0: + # shm_ends.shm.close() + # shm_ends.shm.unlink() + # del shm_ends - # NOTE: this method may recreate the shm dict. - other_ends = SharedMemoryDict(name = channel.name() + "-" + channel.other_role(), size = SHM_DICT_SIZE) - other_ends.shm.close() + # # NOTE: this method may recreate the shm dict. + # other_ends = SharedMemoryDict(name = channel.name() + "-" + channel.other_role(), size = SHM_DICT_SIZE) + # other_ends.shm.close() def create_tx_task( self, channel_name: str, end_id: str, comm_type=CommType.UNICAST @@ -422,4 +488,4 @@ async def handle(self, msg: msg_pb2.Data, channel: Channel) -> None: # await self.set_cleanup_ready_async(msg.end_id) # return - await rxq.put((payload, timestamp)) \ No newline at end of file + await rxq.put((payload, timestamp))