- Title: "Clonos: Consistent Causal Recovery for Highly-Available Streaming Dataflows"
- Authors: Pedro F. Silvestre, Marios Fragkoulis, Diomidis Spinellis, Asterios Katsifodimos
- Paper ID: rdm517
- DOI: 10.1145/3448016.3457320
- ACM Digital Library: Link
- Reproducibility Repository: https://github.com/PSilvestre/ClonosReproducibility
This PDF is also the README.md of the above Reproducibility Repository. It may be more readable (e.g syntax highlighting) on GitHub.
This is a difficult paper to reproduce due to the sheer number of dependencies and distributed components required (Stream processor, HDFS, Kafka, Zookeeper, Data Stream Generators and more). To ease this, we execute our experiments on a virtualized environment: Docker or Kubernetes.
- Local testing: much simpler, but less accurate to our experiments. Uses docker-compose on a large scale-up machine.
- Remote testing: requires a Kubernetes cluster with certain specifications. We have included scripts to provision such a cluster, but the cluster password must be requested.
As requested, the scripts should handle everything:
- Downloading the source code of the systems
- Compiling the systems
- Producing Docker images of the systems
- Provisioning a cluster to execute experiments on
- Installing Kubernetes on said cluster
- Deploying the infrastructure components on Kubernetes (HDFS, Kafka, Zookeeper, Flink and Clonos)
- Executing the experiments and collecting results
- Generating Figures
- Recompiling the paper
To speed up this process we also provide pre-built docker images, which will be identical to the resulting build from source.
We provide the requested README format below.
Repositories:
- Clonos Source Code - The source code of the Clonos System. Branch 'flink1.7' contains the version of Flink we tested against.
Programming Languages:
- Main System: Java
- Workloads: Java (BEAM NEXMark), Scala (Synthetic)
- Testing scripts: Bash (orchestration), Python (Figures, latency and throughput measurers)
Additional Programming Language info: Java8
Compiler Info: openjdk version "1.8.0_292"
Packages/Libraries Needed: To build the system, maven will download all dependencies automatically. Dependencies (See below for a breakdown): bash >=4, java8, python3 (with pip and virtualenv), gradle 4<, make, pdflatex, bibtex, git, maven 3.2.5, docker, docker-compose, kubectl, helm
Breakdown:
- General: bash >=4, java8, python3 (with pip and virtualenv), gradle 4<, make, pdflatex, bibtex
- If building containers from source (DEFAULT): git, maven 3.2.5
- If local experiments (DEFAULT): docker, docker-compose
- If remote experiments (-r): kubectl, helm
Data generators Repository:
- BEAM w\ ClonosRunner - Used in the NEXMark experiments. Branches clonos-runner and clonos-runner-failure-experiments are used respectively in overhead and failure experiments.
- Synthetic workload - Contains the synthetic workload source code and our custom measuring (throughput and latency) scripts.
Our experiments are executed on 2 layers of virtualization. At the bottom we represent bare-metal:
- Docker containers (managed by Kubernetes)
- HPCCloud VirtualMachines
- SurfSara cluster bare-metal nodes.
We will work from bottom to top, describing our deployments for each layer.
At the bare-metal layer, we execute on SurfSara gold_6130 nodes. Their information is as follows:
- C1) Processor: Intel® Xeon® Gold 6130 Processor
- C2) Caches: L1: 16 x 32 KB 8-way set associative instruction caches + 16 x 32 KB 8-way set associative data caches , L2: 16 x 1 MB 16-way set associative caches, L3: 22 MB non-inclusive shared cache
- C3) Memory: 96 GB UPI 10.4 GT/s
- C4) Secondary Storage: 3.2 TB local SSD disk
- C5) Network: 10 Gbit/s ethernet
Additional hardware information about the SurfSara cluster may be found here and here.
On top of this Hardware we request the following Virtual Machines
- 1 Coordinator Node (hosts Kubernetes Coordinator)
- 8vCPU
- 16GB memory
- 50GB Disk
- 6 Follower Nodes (Kubernetes Followers)
- 40vCPU
- 60GB memory
- 100GB Disk
- 2 Generator Nodes (Data Generators for Synthetic Failure Experiments)
- 4vCPU
- 4GB memory
- 5GB Disk
On top of the VMs we set-up Kubernetes and launch a number of components. Note that 500m CPU indicates 1/2 of a CPU:
- Zookeeper: 3 nodes, 500m CPU, 512Mi memory, 5Gi Persistent Volume
- Kafka: 5 nodes, 2000m CPU, 4000Mi memory, 50Gi Persistent Volume
- HDFS:
- 1 Namenode, 4000m CPU, 4000Mi memory, 50Gi Persistent Volume
- 3 Datanode, 3000m CPU, 8000Mi memory, 50Gi Persistent Volume
- Flink:
- 1 JobManager, 8000m CPU, 8192Mi memory
- 150 TaskManager, 2000m CPU, 2000Mi memory, 5Gi Persistent Volume
The ./kubernetes/charts directory contains the deployment manifests, which are a complete source-of-truth.
Before starting, create a docker account and execute docker login
. This is required for building a pushing
docker images. It will also be used as the identity for image pulls performed by the cluster.
The main script is 0_workflow.sh
. By default it will execute experiments locally, using newly built docker images.
It receives a number of parameters which can change its behaviour:
Flag | Parameter | Description |
---|---|---|
-f | - | Run [f]ailure experiments only. |
-p | - | Uses [p]re-built images of Flink and Clonos. Skips building docker images from artifact source. |
-r | - | Run experiments [r]emotely on Kubernetes. ~/.kube/config needs to be set-up (it is set-up by -s) |
-g | semi-colon;separated;list;of;user@IP | Uses the provided hosts as data-[g]enerators for synthetic tests. Requires password-less SSH. |
-s | password | Provision a cluster for experiments from [S]urfSara. Password must be requested to the authors. Will exit after provisioning. |
-d | - | Scale [d]own experiments (e.g. parallelism) so they can be run on fewer resources. Edit experimental_parameters.sh for more control. |
-c | - | Confirms you have read and completed the pre-flight [c]hecks. |
-h | - | Shows [h]elp |
If performing remote experiments, email the authors requesting the password for the delta account in the SurfSara cluster. Alternatively, we can provision the cluster for you ourselves on-demand. Pedro may be reached at pmf[lastName]@gmail.com. If necessary, we will be available to guide you through the reproduction of experiments.
We will now show a series of scenarios and how the script may be used, but first we show how a machine on AWS can be configured with all the necessary dependencies.
First, provision a machine with the following sample specifications:
- Instance type: m5a.8xlarge
- Storage: EBS gp2 100GB
- AMI: Ubuntu Server 18.04 LTS (HVM), SSD Volume Type - ami
When the machine is ready, log into it and perform the following commands:
sudo apt update
sudo apt upgrade
sudo apt-get install python3-venv python3-pip openjdk-8-jdk-headless gradle make docker.io jq texlive-latex-base gnupg2 pass
sudo apt remove openjdk-11-jre-headless
sudo usermod -aG docker ${USER}
sudo curl -L https://github.com/docker/compose/releases/download/v2.2.3/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.tar.gz
sudo tar xvf apache-maven-3.2.5-bin.tar.gz -C /opt/
ADD "...:/opt/apache-maven-3.2.5/bin/" >> /etc/environment
source /etc/environment
# Log out and log back in
At this point all software dependencies are installed and experiments can be reproduced locally as explained below.
git clone https://github.com/PSilvestre/ClonosReproducibility
cd ./ClonosReproducibility
# -p specifies prebuilt, omission of -r assumes local.
./0_workflow.sh -p -c -d
docker login
git clone https://github.com/PSilvestre/ClonosReproducibility
cd ./ClonosReproducibility
# omission of -p assumes build from source, omission of -r assumes local.
./0_workflow.sh -c -d
docker login
git clone https://github.com/PSilvestre/ClonosReproducibility
cd ./ClonosReproducibility
./0_workflow.sh -c -p -s <password>
Once finished the script will print a message similar to this, which you should follow:
Done. You can now ssh into the machine at ubuntu@$IP.
You can launch experiments by doing the folowing:
1. ssh ubuntu@$IP
2. cd ClonosProvisioning
3. git pull # Ensure latest version
4. nohup ./0_workflow.sh -c -p -r -g "$DATA_GENERATOR_IPS" & #use nohup to prevent hangups
5. tail -f nohup.out
To save on cluster resource we will first generate the docker images, then provision the cluster and then execute.
docker login
git clone https://github.com/PSilvestre/ClonosReproducibility
cd ./ClonosReproducibility
# This will build the docker images, push them to the docker hub and print their names. Record these names.
BUILD_DOCKER_IMAGES_FROM_SRC=1 && ./1_build_artifacts.sh
#Once finished, provision the cluster.
./0_workflow.sh -c -p -s <password>
Once finished the script will print a message similar to this:
Done. You can now ssh into the machine at ubuntu@$IP.
You can launch experiments by doing the folowing:
1. ssh ubuntu@$IP
2. cd ClonosProvisioning
3. git pull # Ensure latest version
4. nohup ./0_workflow.sh -c -p -r -g "$DATA_GENERATOR_IPS" &
5. tail -f nohup.out
Follow these instructions, but before executing the 0_workflow.sh script, edit the variables FLINK_IMG and CLONOS_IMG at the top of 0_workflow.sh to use your docker image versions.
You can also avoid SSH'ing into the remote host, because experiments can be launched remotely. However, these are likely to yield worse results and take even longer to execute.
docker login
git clone https://github.com/PSilvestre/ClonosReproducibility
cd ./ClonosReproducibility
# Provision the cluster
./0_workflow.sh -c -p -s <password>
# Execute experiments. This will first build the images, then use kubectl to manage them.
# This will waste some cluster time, which is undesirable.
./0_workflow.sh -c -r
Several parameters can be easily changed on all experiments. Experiments are specified in 2_run_experiments.sh as a configuration string such as this:
jobstr="$system;$query;$par;$D_CI;$throughput;$D_DSD_CLONOS_FAILURE;$D_PTI_CLONOS;$kill_depth"
The second parameter specifies the Nexmark query to use, while the third parameter specifies the parallelism to use and so on...
We decided to provide the synthetic workload as pre-built jars to ease automation efforts. These are the 'synthetic_workload_clonos.jar' and 'synthetic_workload_flink.jar' The same could not be achieved for the beam project, and as such the dependency on Gradle remains.
We thank the reproducibility reviewers for their efforts, and hope that you appreciate ours in attempting to simplify the complexity of these experiments. Such large scale experiments can be flaky and there may be the need to rerun certain experiments/stitch together results. The authors are prepared to support the reproducibility reviewers in their work. Again, Pedro may be reached at pmf[lastName]@gmail.com.