diff --git a/benchmarks/000.microbenchmarks/010.sleep/input.py b/benchmarks/000.microbenchmarks/010.sleep/input.py index eb2d5a62..25502a8c 100644 --- a/benchmarks/000.microbenchmarks/010.sleep/input.py +++ b/benchmarks/000.microbenchmarks/010.sleep/input.py @@ -8,5 +8,5 @@ def buckets_count(): return (0, 0) -def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func): return { 'sleep': size_generators[size] } diff --git a/benchmarks/000.microbenchmarks/020.network-benchmark/input.py b/benchmarks/000.microbenchmarks/020.network-benchmark/input.py index 1519eece..36fc02fd 100644 --- a/benchmarks/000.microbenchmarks/020.network-benchmark/input.py +++ b/benchmarks/000.microbenchmarks/020.network-benchmark/input.py @@ -3,5 +3,5 @@ def buckets_count(): return (0, 1) -def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func): return {'output-bucket': output_buckets[0]} diff --git a/benchmarks/000.microbenchmarks/030.clock-synchronization/input.py b/benchmarks/000.microbenchmarks/030.clock-synchronization/input.py index 1519eece..36fc02fd 100644 --- a/benchmarks/000.microbenchmarks/030.clock-synchronization/input.py +++ b/benchmarks/000.microbenchmarks/030.clock-synchronization/input.py @@ -3,5 +3,5 @@ def buckets_count(): return (0, 1) -def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func): return {'output-bucket': output_buckets[0]} diff --git a/benchmarks/000.microbenchmarks/040.server-reply/input.py b/benchmarks/000.microbenchmarks/040.server-reply/input.py index eb2d5a62..25502a8c 100644 --- a/benchmarks/000.microbenchmarks/040.server-reply/input.py +++ b/benchmarks/000.microbenchmarks/040.server-reply/input.py @@ -8,5 +8,5 @@ def buckets_count(): return (0, 0) -def generate_input(data_dir, size, input_buckets, output_buckets, upload_func): +def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func): return { 'sleep': size_generators[size] } diff --git a/benchmarks/wrappers/knative/nodejs/index.js b/benchmarks/wrappers/knative/nodejs/index.js new file mode 100644 index 00000000..8c8a5b90 --- /dev/null +++ b/benchmarks/wrappers/knative/nodejs/index.js @@ -0,0 +1,60 @@ +const { + CloudEvent, + HTTP +} = require('cloudevents'); +const path = require('path'); +const fs = require('fs'); + +async function handle(context, event) { + + const requestId = context.headers['x-request-id'] || context.headers['X-Request-ID']; + + + // Ensure event data is parsed correctly + const eventData = event ? event : context.body; + context.log.info(`Received event: ${JSON.stringify(eventData)}`); + + const func = require('./function/function.js'); + const begin = Date.now() / 1000; + const start = process.hrtime(); + + try { + // Call the handler function with the event data + const ret = await func.handler(eventData); + const elapsed = process.hrtime(start); + const end = Date.now() / 1000; + const micro = elapsed[1] / 1e3 + elapsed[0] * 1e6; + + let is_cold = false; + const fname = path.join('/tmp', 'cold_run'); + if (!fs.existsSync(fname)) { + is_cold = true; + fs.closeSync(fs.openSync(fname, 'w')); + } + + context.log.info(`Function result: ${JSON.stringify(ret)}`); + + return { + begin: begin, + end: end, + compute_time: micro, + results_time: 0, + result: ret, + request_id: requestId, + is_cold: is_cold, + }; + } catch (error) { + context.log.error(`Error - invocation failed! Reason: ${error.message}`); + return { + begin: begin, + end: Date.now() / 1000, + compute_time: process.hrtime(start), + results_time: 0, + result: `Error - invocation failed! Reason: ${error.message}`, + request_id: requestId, + is_cold: false, + }; + } +} + +exports.handle = handle; diff --git a/benchmarks/wrappers/knative/nodejs/storage.js b/benchmarks/wrappers/knative/nodejs/storage.js new file mode 100644 index 00000000..e20332f0 --- /dev/null +++ b/benchmarks/wrappers/knative/nodejs/storage.js @@ -0,0 +1,60 @@ +const minio = require('minio'), + path = require('path'), + uuid = require('uuid'), + util = require('util'), + stream = require('stream'), + fs = require('fs'); + +class minio_storage { + + constructor() { + let address = process.env.MINIO_STORAGE_CONNECTION_URL; + let access_key = process.env.MINIO_STORAGE_ACCESS_KEY; + let secret_key = process.env.MINIO_STORAGE_SECRET_KEY; + + this.client = new minio.Client({ + endPoint: address.split(':')[0], + port: parseInt(address.split(':')[1], 10), + accessKey: access_key, + secretKey: secret_key, + useSSL: false + }); + } + + unique_name(file) { + let name = path.parse(file); + let uuid_name = uuid.v4().split('-')[0]; + return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext)); + } + + upload(bucket, file, filepath) { + let uniqueName = this.unique_name(file); + return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)]; + }; + + download(bucket, file, filepath) { + return this.client.fGetObject(bucket, file, filepath); + }; + + uploadStream(bucket, file) { + var write_stream = new stream.PassThrough(); + let uniqueName = this.unique_name(file); + let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size); + return [write_stream, promise, uniqueName]; + }; + + downloadStream(bucket, file) { + var read_stream = new stream.PassThrough(); + return this.client.getObject(bucket, file); + }; + + static get_instance() { + if (!this.instance) { + this.instance = new storage(); + } + return this.instance; + } + + +}; +exports.storage = minio_storage; \ No newline at end of file diff --git a/benchmarks/wrappers/knative/python/func.py b/benchmarks/wrappers/knative/python/func.py new file mode 100644 index 00000000..a8ceb6c5 --- /dev/null +++ b/benchmarks/wrappers/knative/python/func.py @@ -0,0 +1,57 @@ +import logging +import datetime +import os +from flask import jsonify +from parliament import Context +import minio + + +def main(context: Context): + logging.getLogger().setLevel(logging.INFO) + begin = datetime.datetime.now() # Initialize begin outside the try block + + event = context.request.json + logging.info(f"Received event: {event}") + + request_id = context.request.headers.get('X-Request-ID') + + try: + from function import function + + # Update the timestamp after extracting JSON data + begin = datetime.datetime.now() + # Pass the extracted JSON data to the handler function + ret = function.handler(event) + end = datetime.datetime.now() + logging.info("Function result: {}".format(ret)) + log_data = {"result": ret["result"]} + if "measurement" in ret: + log_data["measurement"] = ret["measurement"] + results_time = (end - begin) / datetime.timedelta(microseconds=1) + + is_cold = False + fname = "cold_run" + if not os.path.exists(fname): + is_cold = True + open(fname, "a").close() + + return { + "request_id": request_id, + "begin": begin.strftime("%s.%f"), + "end": end.strftime("%s.%f"), + "results_time": results_time, + "is_cold": is_cold, + "result": log_data, + } + + except Exception as e: + end = datetime.datetime.now() + results_time = (end - begin) / datetime.timedelta(microseconds=1) + logging.error(f"Error - invocation failed! Reason: {e}") + return { + "request_id": request_id, + "begin": begin.strftime("%s.%f"), + "end": end.strftime("%s.%f"), + "results_time": results_time, + "result": f"Error - invocation failed! Reason: {e}", + } diff --git a/benchmarks/wrappers/knative/python/storage.py b/benchmarks/wrappers/knative/python/storage.py new file mode 100644 index 00000000..a982afc8 --- /dev/null +++ b/benchmarks/wrappers/knative/python/storage.py @@ -0,0 +1,77 @@ +import os +import uuid +import json +import minio +import logging + + +class storage: + instance = None + client = None + + def __init__(self): + try: + """ + Minio does not allow another way of configuring timeout for connection. + The rest of configuration is copied from source code of Minio. + """ + import urllib3 + from datetime import timedelta + + timeout = timedelta(seconds=1).seconds + + mgr = urllib3.PoolManager( + timeout=urllib3.util.Timeout(connect=timeout, read=timeout), + maxsize=10, + retries=urllib3.Retry( + total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] + ), + ) + self.client = minio.Minio( + os.getenv("MINIO_STORAGE_CONNECTION_URL"), + access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"), + secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"), + secure=False, + http_client=mgr, + ) + except Exception as e: + logging.info(e) + raise e + + @staticmethod + def unique_name(name): + name, extension = os.path.splitext(name) + return "{name}.{random}{extension}".format( + name=name, extension=extension, random=str(uuid.uuid4()).split("-")[0] + ) + + def upload(self, bucket, file, filepath): + key_name = storage.unique_name(file) + self.client.fput_object(bucket, key_name, filepath) + return key_name + + def download(self, bucket, file, filepath): + self.client.fget_object(bucket, file, filepath) + + def download_directory(self, bucket, prefix, path): + objects = self.client.list_objects(bucket, prefix, recursive=True) + for obj in objects: + file_name = obj.object_name + self.download(bucket, file_name, os.path.join(path, file_name)) + + def upload_stream(self, bucket, file, bytes_data): + key_name = storage.unique_name(file) + self.client.put_object( + bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes + ) + return key_name + + def download_stream(self, bucket, file): + data = self.client.get_object(bucket, file) + return data.read() + + @staticmethod + def get_instance(): + if storage.instance is None: + storage.instance = storage() + return storage.instance diff --git a/config/example.json b/config/example.json index dc4da9ad..ea28d57b 100644 --- a/config/example.json +++ b/config/example.json @@ -87,6 +87,26 @@ "output_buckets": [], "type": "minio" } + }, + "knative": { + "shutdownStorage": false, + "removeCluster": false, + "knativeExec": "func", + "docker_registry": { + "registry": "", + "username": "", + "password": "" + }, + "storage": { + "address": "", + "mapped_port": "", + "access_key": "", + "secret_key": "", + "instance_id": "", + "input_buckets": [], + "output_buckets": [], + "type": "minio" + } } } } diff --git a/config/systems.json b/config/systems.json index bb21dcd9..a3d3a2e4 100644 --- a/config/systems.json +++ b/config/systems.json @@ -234,5 +234,47 @@ } } } + }, + "knative": { + "languages": { + "python": { + "base_images": { + "3.8": "registry.access.redhat.com/ubi8/python-38", + "3.9": "registry.access.redhat.com/ubi8/python-39", + "3.10": "registry.access.redhat.com/ubi8/python-310" + }, + "images": [], + "username": "docker_user", + "deployment": { + "files": [ + "func.py", + "storage.py" + ], + "packages": { + "parliament-functions": "0.1.0", + "minio": "5.0.10" + } + } + }, + "nodejs": { + "base_images": { + "16": "registry.access.redhat.com/ubi8/nodejs-16", + "14": "registry.access.redhat.com/ubi8/nodejs-14", + "12": "registry.access.redhat.com/ubi8/nodejs-12" + }, + "images": [], + "username": "docker_user", + "deployment": { + "files": [ + "index.js", + "storage.js" + ], + "packages": { + "faas-js-runtime": "^2.2.2", + "minio": "7.0.16" + } + } + } + } } } diff --git a/docs/platforms.md b/docs/platforms.md index 27738b6e..94349ac0 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -1,22 +1,21 @@ - SeBS supports three commercial serverless platforms: AWS Lambda, Azure Functions, and Google Cloud Functions. Furthermore, we support the open source FaaS system OpenWhisk. The file `config/example.json` contains all parameters that users can change to customize the deployment. -Some of these parameters, such as cloud credentials or storage instance address, +Some of these parameters, such as cloud credentials or storage instance address, are required. In the following subsections, we discuss the mandatory and optional customization points for each platform. -> [!WARNING] +> [! WARNING] > On many platforms, credentials can be provided as environment variables or through the SeBS configuration. SeBS will not store your credentials in the cache. When saving results, SeBS stores user benchmark and experiment configuration for documentation and reproducibility, except for credentials that are erased. If you provide the credentials through JSON input configuration, do not commit nor publish these files anywhere. ### Cloud Account Identifiers SeBS ensures that all locally cached cloud resources are valid by storing a unique identifier associated with each cloud account. Furthermore, we store this identifier in experiment results to easily match results with the cloud account or subscription that was used to obtain them. We use non-sensitive identifiers such as account IDs on AWS, subscription IDs on Azure, and Google Cloud project IDs. -If you have JSON result files, such as `experiment.json` from a benchmark run or '/*.json' from an experiment, you can remove all identifying information by removing the JSON object `.config.deployment.credentials`. This can be achieved easily with the CLI tool `jq`: +If you have JSON result files, such as `experiment.json` from a benchmark run or '/*.json' from an experiment, you can remove all identifying information by removing the JSON object `.config.deployment.credentials` . This can be achieved easily with the CLI tool `jq` : ``` jq 'del(.config.deployment.credentials)' | sponge @@ -31,7 +30,7 @@ Additionally, the account must have `AmazonAPIGatewayAdministrator` permission t automatically AWS HTTP trigger. You can provide a [role](https://docs.aws.amazon.com/lambda/latest/dg/lambda-intro-execution-role.html) with permissions to access AWS Lambda and S3; otherwise, one will be created automatically. -To use a user-defined lambda role, set the name in config JSON - see an example in `config/example.json`. +To use a user-defined lambda role, set the name in config JSON - see an example in `config/example.json` . You can pass the credentials either using the default AWS-specific environment variables: @@ -61,7 +60,7 @@ or in the JSON input configuration: Azure provides a free tier for 12 months. You need to create an account and add a [service principal](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal) to enable non-interactive login through CLI. -Since this process has [an easy, one-step CLI solution](https://docs.microsoft.com/en-us/cli/azure/ad/sp?view=azure-cli-latest#az-ad-sp-create-for-rbac), +Since this process has [an easy, one-step CLI solution](https://docs.microsoft.com/en-us/cli/azure/ad/sp?view=azure-cli-latest#az-ad-sp-create-for-rbac), we added a small tool **tools/create_azure_credentials** that uses the interactive web-browser authentication to login into Azure CLI and create a service principal. @@ -105,10 +104,10 @@ or in the JSON input configuration: } ``` -> [!WARNING] -> The tool assumes there is only one subscription active on the account. If you want to bind the newly created service principal to a specific subscription, or the created credentials do not work with SeBS and you see errors such as "No subscriptions found for X", then you must specify a subscription when creating the service principal. Check your subscription ID on in the Azure portal, and use the CLI option `tools/create_azure_credentials.py --subscription `. +> [! WARNING] +> The tool assumes there is only one subscription active on the account. If you want to bind the newly created service principal to a specific subscription, or the created credentials do not work with SeBS and you see errors such as "No subscriptions found for X", then you must specify a subscription when creating the service principal. Check your subscription ID on in the Azure portal, and use the CLI option `tools/create_azure_credentials.py --subscription ` . -> [!WARNING] +> [! WARNING] > When you log in for the first time on a device, Microsoft might require authenticating your login with Multi-Factor Authentication (MFA). In this case, we will return an error such as: "The following tenants require Multi-Factor Authentication (MFA). Use 'az login --tenant TENANT_ID' to explicitly login to a tenant.". Then, you can pass the tenant ID by using the `--tenant ` flag. ### Resources @@ -121,8 +120,8 @@ or in the JSON input configuration: The Google Cloud Free Tier gives free resources. It has two parts: -- A 12-month free trial with $300 credit to use with any Google Cloud services. -- Always Free, which provides limited access to many common Google Cloud resources, free of charge. +* A 12-month free trial with $300 credit to use with any Google Cloud services. +* Always Free, which provides limited access to many common Google Cloud resources, free of charge. You need to create an account and add [service account](https://cloud.google.com/iam/docs/service-accounts) to permit operating on storage and functions. From the cloud problem, download the cloud credentials saved as a JSON file. @@ -155,18 +154,18 @@ or in the JSON input configuration: SeBS expects users to deploy and configure an OpenWhisk instance. Below, you will find example of instruction for deploying OpenWhisk instance. The configuration parameters of OpenWhisk for SeBS can be found -in `config/example.json` under the key `['deployment']['openwhisk']`. +in `config/example.json` under the key `['deployment']['openwhisk']` . In the subsections below, we discuss the meaning and use of each parameter. To correctly deploy SeBS functions to OpenWhisk, following the subsections on *Toolchain* and *Docker* configuration is particularly important. -> [!WARNING] +> [! WARNING] > Some benchmarks might require larger memory allocations, e.g., 2048 MB. Not all OpenWhisk deployments support this out-of-the-box. > The deployment section below shows an example of changing the default function memory limit from 512 MB to a higher value. ### Deployment -In `tools/openwhisk_preparation.py`, we include scripts that help install +In `tools/openwhisk_preparation.py` , we include scripts that help install [kind (Kubernetes in Docker)](https://kind.sigs.k8s.io/) and deploy OpenWhisk on a `kind` cluster. Alternatively, you can deploy to an existing cluster by [using offical deployment instructions](https://github.com/apache/openwhisk-deploy-kube/blob/master/docs/k8s-kind.md): @@ -177,24 +176,25 @@ helm install owdev ./helm/openwhisk -n openwhisk --create-namespace -f deploy/ki kubectl get pods -n openwhisk --watch ``` -To change the maximum memory allocation per function, edit the `max` value under `memory` in file `helm/openwhisk/values.yaml`. +To change the maximum memory allocation per function, edit the `max` value under `memory` in file `helm/openwhisk/values.yaml` . To run all benchmarks, we recommend of at least "2048m". ### Toolchain We use OpenWhisk's CLI tool [wsk](https://github.com/apache/openwhisk-cli) to manage the deployment of functions to OpenWhisk. -Please install `wsk`and configure it to point to your OpenWhisk installation. -By default, SeBS assumes that `wsk` is available in the `PATH`. +Please install `wsk` and configure it to point to your OpenWhisk installation. +By default, SeBS assumes that `wsk` is available in the `PATH` . To override this, set the configuration option `wskExec` to the location of your `wsk` executable. If you are using a local deployment of OpenWhisk with a self-signed -certificate, you can skip certificate validation with the `wsk` flag `--insecure`. -To enable this option, set `wskBypassSecurity` to `true`. +certificate, you can skip certificate validation with the `wsk` flag `--insecure` . +To enable this option, set `wskBypassSecurity` to `true` . At the moment, all functions are deployed as [*web actions*](https://github.com/apache/openwhisk/blob/master/docs/webactions.md) that do not require credentials to invoke functions. Furthermore, SeBS can be configured to remove the `kind` + cluster after finishing experiments automatically. The boolean option `removeCluster` helps to automate the experiments that should be conducted on fresh instances of the system. @@ -216,10 +216,10 @@ Therefore, all SeBS benchmark functions are available on the Docker Hub. When adding new functions and extending existing functions with new languages and new language versions, Docker images must be placed in the registry. However, pushing the image to the default `spcleth/serverless-benchmarks` + repository on Docker Hub requires permissions. To use a different Docker Hub repository, change the key -`['general']['docker_repository']` in `config/systems.json`. - +`['general']['docker_repository']` in `config/systems.json` . Alternatively, OpenWhisk users can configure the FaaS platform to use a custom and private Docker registry and push new images there. @@ -240,7 +240,7 @@ Docker authorization on invoker nodes. [See the OpenWhisk issue for details](htt ### Code Deployment -SeBS builds and deploys a new code package when constructing the local cache, +SeBS builds and deploys a new code package when constructing the local cache, when the function's contents have changed, and when the user requests a forced rebuild. In OpenWhisk, this setup is changed - SeBS will first attempt to verify if the image exists already in the registry and skip building the Docker @@ -252,7 +252,7 @@ avoid failing invocations in OpenWhisk. For performance reasons, this check is performed only once when initializing the local cache for the first time. -When the function code is updated, +When the function code is updated, SeBS will build the image and push it to the registry. Currently, the only available option of checking image existence in the registry is pulling the image. @@ -263,7 +263,7 @@ To use that feature in SeBS, set the `experimentalManifest` flag to true. ### Storage To provide persistent object storage in OpenWhisk, users must first deploy an instance -of [`Minio`](https://github.com/minio/minio) storage. +of [ `Minio` ](https://github.com/minio/minio) storage. The storage instance is deployed as a Docker container, and it can be retained across many experiments. OpenWhisk functions must be able to reach the storage instance. @@ -277,7 +277,7 @@ Use the following command to deploy the storage instance locally and map the hos ``` The output will look similar to the one below. -As we can see, the storage container is running on the default Docker bridge network with address `172.17.0.2` and uses port `9000`. +As we can see, the storage container is running on the default Docker bridge network with address `172.17.0.2` and uses port `9000` . From the host network, port `9011` is mapped to the container's port `9000` to allow external parties - such as OpenWhisk functions - to reach the storage. ``` @@ -293,13 +293,13 @@ From the host network, port `9011` is mapped to the container's port `9000` to a } ``` -The storage configuration found in `out_storage.json` needs to be provided to +The storage configuration can be found in `out_storage.json` needs to be provided to SeBS via the SeBS configuration, however the address in `out_storage.json` is likely incorrect. By default, it is a address in the local bridge network not accessible to most of the Kubernetes cluster. It should be replaced with an external address of the -machine and the mapped port. You can typically find an externally accessible address via `ip addr`. +machine and the mapped port. You can typically find an externally accessible address via `ip addr` . -For example, for an external address `10.10.1.15` (a LAN-local address on CloudLab) and mapped port `9011`, set the SeBS configuration as follows: +For example, for an external address `10.10.1.15` (a LAN-local address on CloudLab) and mapped port `9011` , set the SeBS configuration as follows: ``` jq --argfile file1 out_storage.json '.deployment.openwhisk.storage = $file1 | .deployment.openwhisk.storage.address = "10.10.1.15:9011"' config/example.json > config/openwhisk.json @@ -330,3 +330,84 @@ will automatically detect an existing Minio instance. Reusing the Minio instance helps run experiments faster and smoothly since SeBS does not have to re-upload function's data on each experiment. +## Knative + +SeBS expects users to deploy and configure a Knative instance. Below, you will find example instructions for deploying a Knative-enabled cluster instance. The configuration parameters of Knative for SeBS can be found in the `config/example.json` file under the key `[deployment]` `[knative]` . + +### Pre-Requisites + +There are a few installations we expect users to have before proceeding further. Please install the following tools beforehand: +* `kubectl` +* `kind` +* `helm` +* `func` (It's a [Knative function CLI](https://knative.dev/docs/functions/install-func/#installing-the-func-cli) that is used to interact with your Knative functions directly from your command line. SeBS also makes use of this tool to `build` and `deploy` the benchmarks as Knative functions.) +* `minio` + +### Deployment + +In `tools/` , we included a script that helps you installing a fully configured Knative kind cluster (Kubernetes in Docker). You can run it using the following command: + +```sh +./tools/knative_setup.sh +``` + +After running the script, make sure to export the path of the kubernetes configuration yaml file to `KUBECONFIG` . This file will automatically get created by the script you ran before and can be found in the `tools/bin/` folder. You can export it like this: + +```sh +export KUBECONFIG=path/to/bin/folder/kubeconfig.yaml +``` + +This step is necessary to interact with the Knative-enabled kind cluster that was configured by the script above. + +> We use Knative's CLI tool [func](https://knative.dev/docs/functions/install-func/#installing-the-func-cli) to manage the deployment of functions to serve as Knative services. Please install `func` and configure it to point to your Knative installation. + +### Storage Configuration + +SeBS requires a storage provider to store the function's data. We use Minio as the storage. + +Spin-up the Minio storage by running the following command: + +```bash +./sebs.py storage start minio --port 9011 --output-json out_storage.json +``` + +The output will look similar to the one below. +As we can see, the storage container is running on the default Docker bridge network with address `172.17.0.2` and uses port `9000` . +From the host network, port `9011` is mapped to the container's port `9000` to allow external parties - such as OpenWhisk functions - to reach the storage. + +``` +{ + "address": "172.17.0.2:9000", + "mapped_port": 9011, + "access_key": "XXX", + "secret_key": "XXX", + "instance_id": "XXX", + "input_buckets": [], + "output_buckets": [], + "type": "minio" +} +``` + +The storage configuration can be found in `out_storage.json` needs to be provided to +SeBS via the SeBS configuration, however the address in `out_storage.json` is likely incorrect. By +default, it is a address in the local bridge network not accessible to most of +the Kubernetes cluster. It should be replaced with an external address of the +machine and the mapped port. You can typically find an externally accessible address via `ip addr` . +The command below will help you start a Minio container in a detached mode: + +After you find the ip address then update it in storage configuration +For example, for an external address `192.168.29.130` (a random Computer ip address) and mapped port `9011` , set the SeBS configuration as follows: + +```sh +jq --argfile storage_data out_storage.json '.deployment.knative.storage = $storage_data' | .deployment.knative.storage.address = "192.168.29.130:9011"' config/example.json > config/knative.json +``` + +Now you can test the benchmark using the next instruction. + +### Benchmark Deployment Example + +The following example shows how to deploy a benchmark to the Knative-enabled cluster: + +```sh +./sebs.py benchmark invoke 311.compression test --config config/knative.json --deployment knative --language python --language-version 3.9 +``` diff --git a/install.py b/install.py index 57f047d2..48a4bfd0 100755 --- a/install.py +++ b/install.py @@ -7,7 +7,7 @@ parser = argparse.ArgumentParser(description="Install SeBS and dependencies.") parser.add_argument('--venv', metavar='DIR', type=str, default="python-venv", help='destination of local Python virtual environment') parser.add_argument('--python-path', metavar='DIR', type=str, default="python3", help='Path to local Python installation.') -for deployment in ["aws", "azure", "gcp", "openwhisk"]: +for deployment in ["aws", "azure", "gcp", "openwhisk", "knative"]: parser.add_argument(f"--{deployment}", action="store_const", const=True, default=True, dest=deployment) parser.add_argument(f"--no-{deployment}", action="store_const", const=False, default=True, dest=deployment) for deployment in ["local"]: @@ -63,6 +63,13 @@ def execute(cmd, cwd=None): execute(f'echo "export SEBS_WITH_OPENWHISK={flag}" >> {env_dir}/bin/activate') execute(f'echo "unset SEBS_WITH_OPENWHISK" >> {env_dir}/bin/deactivate') +if args.knative: + print("Install Python dependencies for Knative") + execute(". {}/bin/activate && pip3 install -r requirements.knative.txt".format(env_dir)) +flag = "TRUE" if args.knative else "FALSE" +execute(f'echo "export SEBS_WITH_KNATIVE={flag}" >> {env_dir}/bin/activate') +execute(f'echo "unset SEBS_WITH_KNATIVE" >> {env_dir}/bin/deactivate') + if args.local: print("Install Python dependencies for local") execute(". {}/bin/activate && pip3 install -r requirements.local.txt".format(env_dir)) @@ -99,4 +106,3 @@ def execute(cmd, cwd=None): execute("python3 setup.py build") execute("python3 pypapi/papi_build.py") os.chdir(cur_dir) - diff --git a/requirements.knative.txt b/requirements.knative.txt new file mode 100644 index 00000000..dfb2338a --- /dev/null +++ b/requirements.knative.txt @@ -0,0 +1,2 @@ +parliament-functions==0.1.0 +flask \ No newline at end of file diff --git a/sebs.py b/sebs.py index ff7f7769..b95e51d6 100755 --- a/sebs.py +++ b/sebs.py @@ -88,7 +88,7 @@ def common_params(func): @click.option( "--deployment", default=None, - type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk"]), + type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk", "knative"]), help="Cloud deployment to use.", ) @click.option( diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 90eed6ae..19abcb67 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -296,7 +296,7 @@ def add_deployment_package_python(self, output_dir): if len(packages): with open(os.path.join(output_dir, "requirements.txt"), "a") as out: for package in packages: - out.write(package) + out.write(package + "\n") def add_deployment_package_nodejs(self, output_dir): # modify package.json diff --git a/sebs/config.py b/sebs/config.py index cfafbf00..5b31baf7 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -68,3 +68,4 @@ def benchmark_image_tag( def username(self, deployment_name: str, language_name: str) -> str: return self._system_config[deployment_name]["languages"][language_name]["username"] + \ No newline at end of file diff --git a/sebs/faas/config.py b/sebs/faas/config.py index 19c7d3ab..889a0102 100644 --- a/sebs/faas/config.py +++ b/sebs/faas/config.py @@ -204,6 +204,10 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config from sebs.openwhisk.config import OpenWhiskConfig implementations["openwhisk"] = OpenWhiskConfig.deserialize + if has_platform("knative"): + from sebs.knative.config import KnativeConfig + + implementations["knative"] = KnativeConfig.deserialize func = implementations.get(name) assert func, "Unknown config type!" return func(config[name] if name in config else config, cache, handlers) diff --git a/sebs/knative/__init__.py b/sebs/knative/__init__.py new file mode 100644 index 00000000..1ed9b12f --- /dev/null +++ b/sebs/knative/__init__.py @@ -0,0 +1,2 @@ +from .knative import Knative # noqa +from .config import KnativeConfig # noqa diff --git a/sebs/knative/config.py b/sebs/knative/config.py new file mode 100644 index 00000000..17374912 --- /dev/null +++ b/sebs/knative/config.py @@ -0,0 +1,231 @@ +from sebs.cache import Cache +from sebs.faas.config import Resources, Config, Credentials +from sebs.utils import LoggingHandlers +from sebs.storage.config import MinioConfig + +from typing import cast, Optional + + +class KnativeCredentials(Credentials): + def __init__(self, config: dict): + super().__init__() + self._docker_username = config.get("docker_username") + self._docker_password = config.get("docker_password") + + @staticmethod + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> "KnativeCredentials": + cached_config = cache.get_config("knative") + if cached_config and "credentials" in cached_config: + return KnativeCredentials(cached_config["credentials"]) + else: + return KnativeCredentials(config) + + def serialize(self) -> dict: + return { + "docker_username": self._docker_username, + "docker_password": self._docker_password, + } + + +class KnativeResources(Resources): + def __init__( + self, + registry: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + registry_updated: bool = False, + ): + super().__init__(name="knative") + self._docker_registry = registry if registry != "" else None + self._docker_username = username if username != "" else None + self._docker_password = password if password != "" else None + self._registry_updated = registry_updated + self._storage: Optional[MinioConfig] = None + self._storage_updated = False + + @staticmethod + def typename() -> str: + return "Knative.Resources" + + @property + def docker_registry(self) -> Optional[str]: + return self._docker_registry + + @property + def docker_username(self) -> Optional[str]: + return self._docker_username + + @property + def docker_password(self) -> Optional[str]: + return self._docker_password + + @property + def storage_config(self) -> Optional[MinioConfig]: + return self._storage + + @property + def storage_updated(self) -> bool: + return self._storage_updated + + @property + def registry_updated(self) -> bool: + return self._registry_updated + + @staticmethod + def initialize(res: Resources, dct: dict): + ret = cast(KnativeResources, res) + ret._docker_registry = dct.get("registry") + ret._docker_username = dct.get("username") + ret._docker_password = dct.get("password") + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: + cached_config = cache.get_config("knative") + ret = KnativeResources() + if cached_config: + super(KnativeResources, KnativeResources).initialize( + ret, cached_config["resources"] + ) + + if "docker_registry" in config: + KnativeResources.initialize(ret, config["docker_registry"]) + ret.logging.info("Using user-provided Docker registry for Knative.") + ret.logging_handlers = handlers + + if not ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + and cached_config["resources"]["docker"] == config["docker_registry"] + ): + ret._registry_updated = True + + elif ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + ): + KnativeResources.initialize(ret, cached_config["resources"]["docker"]) + ret.logging_handlers = handlers + ret.logging.info("Using cached Docker registry for Knative") + else: + ret = KnativeResources() + ret.logging.info("Using default Docker registry for Knative.") + ret.logging_handlers = handlers + ret._registry_updated = True + + if "storage" in config: + ret._storage = MinioConfig.deserialize(config["storage"]) + ret.logging.info( + "Using user-provided configuration of storage for Knative." + ) + + if not ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + and cached_config["resources"]["storage"] == config["storage"] + ): + ret.logging.info( + "User-provided configuration is different from cached storage, " + "we will update existing Knative function." + ) + ret._storage_updated = True + + elif ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + ): + ret._storage = MinioConfig.deserialize( + cached_config["resources"]["storage"] + ) + ret.logging.info("Using cached configuration of storage for Knative.") + + return ret + + def update_cache(self, cache: Cache): + super().update_cache(cache) + cache.update_config( + val=self.docker_registry, + keys=["knative", "resources", "docker", "registry"], + ) + cache.update_config( + val=self.docker_username, + keys=["knative", "resources", "docker", "username"], + ) + cache.update_config( + val=self.docker_password, + keys=["knative", "resources", "docker", "password"], + ) + if self._storage: + self._storage.update_cache(["knative", "resources", "storage"], cache) + + def serialize(self) -> dict: + out: dict = { + **super().serialize(), + "docker_registry": self.docker_registry, + "docker_username": self.docker_username, + "docker_password": self.docker_password, + } + if self._storage: + out = {**out, "storage": self._storage.serialize()} + return out + + +class KnativeConfig(Config): + name: str + cache: Cache + + def __init__(self, config: dict, cache: Cache): + super().__init__(name="knative") + self._resources = KnativeResources() + self._credentials = KnativeCredentials(config) + self.knative_exec = config["knativeExec"] + self.shutdownStorage = config["shutdownStorage"] + self.removeCluster = config["removeCluster"] + self.cache = cache + + @property + def resources(self) -> KnativeResources: + return self._resources + + @property + def credentials(self) -> KnativeCredentials: + return self._credentials + + @staticmethod + def initialize(cfg: Config, dct: dict): + pass + + def serialize(self) -> dict: + return { + "name": "knative", + "shutdownStorage": self.shutdownStorage, + "removeCluster": self.removeCluster, + "knativeExec": self.knative_exec, + "resources": self._resources.serialize(), + "credentials": self._credentials.serialize(), + } + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: + cached_config = cache.get_config("knative") + resources = cast( + KnativeResources, KnativeResources.deserialize(config, cache, handlers) + ) + + res = KnativeConfig(config, cache) + res.logging_handlers = handlers + res._resources = resources + res._credentials = KnativeCredentials.deserialize(config, cache, handlers) + return res + + def update_cache(self, cache: Cache): + cache.update_config(val=self.knative_exec, keys=["knative", "knativeExec"]) + self.resources.update_cache(cache) + cache.update_config( + val=self.credentials.serialize(), keys=["knative", "credentials"] + ) diff --git a/sebs/knative/function.py b/sebs/knative/function.py new file mode 100644 index 00000000..d98a0ff6 --- /dev/null +++ b/sebs/knative/function.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from typing import cast, Optional +from dataclasses import dataclass + +from sebs.benchmark import Benchmark +from sebs.faas.function import Function, FunctionConfig, Runtime +from sebs.storage.config import MinioConfig + + +@dataclass +class KnativeFunctionConfig(FunctionConfig): + docker_image: str = "" + namespace: str = "default" + storage: Optional[MinioConfig] = None + url: str = "" + + @staticmethod + def deserialize(data: dict) -> KnativeFunctionConfig: + keys = list(KnativeFunctionConfig.__dataclass_fields__.keys()) + data = {k: v for k, v in data.items() if k in keys} + data["runtime"] = Runtime.deserialize(data["runtime"]) + data["storage"] = MinioConfig.deserialize(data["storage"]) + return KnativeFunctionConfig(**data) + + def serialize(self) -> dict: + return self.__dict__ + + @staticmethod + def from_benchmark(benchmark: Benchmark) -> KnativeFunctionConfig: + return super(KnativeFunctionConfig, KnativeFunctionConfig)._from_benchmark( + benchmark, KnativeFunctionConfig + ) + + +class KnativeFunction(Function): + def __init__( + self, + name: str, + benchmark: str, + code_package_hash: str, + cfg: KnativeFunctionConfig, + ): + super().__init__(benchmark, name, code_package_hash, cfg) + + @property + def config(self) -> KnativeFunctionConfig: + return cast(KnativeFunctionConfig, self._cfg) + + @staticmethod + def typename() -> str: + return "Knative.Function" + + def serialize(self) -> dict: + return {**super().serialize(), "config": self._cfg.serialize()} + + @staticmethod + def deserialize(cached_config: dict) -> KnativeFunction: + from sebs.faas.function import Trigger + from sebs.knative.triggers import LibraryTrigger, HTTPTrigger + + cfg = KnativeFunctionConfig.deserialize(cached_config["config"]) + ret = KnativeFunction( + cached_config["name"], + cached_config["benchmark"], + cached_config["hash"], + cfg, + ) + for trigger in cached_config["triggers"]: + trigger_type = cast( + Trigger, + {"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + ) + assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) + ret.add_trigger(trigger_type.deserialize(trigger)) + return ret diff --git a/sebs/knative/knative.py b/sebs/knative/knative.py new file mode 100644 index 00000000..1c732050 --- /dev/null +++ b/sebs/knative/knative.py @@ -0,0 +1,637 @@ +from datetime import datetime +import json +import os +import re +import shutil +import subprocess +import yaml +from sebs import benchmark +from sebs.faas.system import System +from sebs.faas.function import ExecutionResult, Function, Trigger +from sebs.faas.storage import PersistentStorage +from sebs.benchmark import Benchmark +from sebs.config import SeBSConfig +from sebs.cache import Cache +from sebs.utils import LoggingHandlers, execute +from sebs.knative.storage import KnativeMinio +from sebs.knative.triggers import LibraryTrigger, HTTPTrigger +from typing import Dict, Tuple, Type, List, Optional +import docker +from .function import KnativeFunction, KnativeFunctionConfig +from typing import cast +from .config import KnativeConfig + + +class Knative(System): + _config: KnativeConfig + + def __init__( + self, + system_config: SeBSConfig, + config: KnativeConfig, + cache_client: Cache, + docker_client: docker.client, + logger_handlers: LoggingHandlers, + ): + super().__init__(system_config, cache_client, docker_client) + self._config = config + self._logging_handlers = logger_handlers + + if self.config.resources.docker_username: + if self.config.resources.docker_registry: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + registry=self.config.resources.docker_registry, + ) + else: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + ) + + def initialize( + self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None + ): + self.initialize_resources(select_prefix=resource_prefix) + + @property + def config(self) -> KnativeConfig: + return self._config + + def get_storage(self, replace_existing: bool = False) -> PersistentStorage: + if not hasattr(self, "storage"): + + if not self.config.resources.storage_config: + raise RuntimeError( + "Knative is missing the configuration of pre-allocated storage!" + ) + self.storage = KnativeMinio.deserialize( + self.config.resources.storage_config, + self.cache_client, + self.config.resources, + ) + self.storage.logging_handlers = self.logging_handlers + else: + self.storage.replace_existing = replace_existing + return self.storage + + def shutdown(self) -> None: + if hasattr(self, "storage") and self.config.shutdownStorage: + self.storage.stop() + if self.config.removeCluster: + from tools.knative_setup import delete_cluster + + delete_cluster() + super().shutdown() + + def sanitize_benchmark_name_for_knative(self, name: str) -> str: + # Replace invalid characters with hyphens + sanitized_name = re.sub(r"[^a-z0-9\-]+", "-", name.lower()) + # Ensure it starts with an alphabet + sanitized_name_starts_with_alphabet = re.sub(r"^[^a-z]+", "", sanitized_name) + # Ensure it ends with an alphanumeric character + sanitized_benchmark_name = re.sub( + r"[^a-z0-9]+$", "", sanitized_name_starts_with_alphabet + ) + return sanitized_benchmark_name + + @staticmethod + def name() -> str: + return "knative" + + @staticmethod + def typename(): + return "Knative" + + @staticmethod + def function_type() -> Type[Function]: + return KnativeFunction + + def get_knative_func_cmd(self) -> List[str]: + cmd = [self.config.knative_exec] + return cmd + + def find_image(self, repository_name, image_tag) -> bool: + + if self.config.experimentalManifest: + try: + # This requires enabling experimental Docker features + # Furthermore, it's not yet supported in the Python library + execute(f"docker manifest inspect {repository_name}:{image_tag}") + return True + except RuntimeError: + return False + else: + try: + # default version requires pulling for an image + self.docker_client.images.pull( + repository=repository_name, tag=image_tag + ) + return True + except docker.errors.NotFound: + return False + + def update_func_yaml_with_resources(self, directory: str, memory: int): + yaml_path = os.path.join(directory, "func.yaml") + + with open(yaml_path, "r") as yaml_file: + func_yaml_content = yaml.safe_load(yaml_file) + + if "run" in func_yaml_content: + if "options" not in func_yaml_content: + func_yaml_content["options"] = {} + if "resources" not in func_yaml_content["options"]: + func_yaml_content["options"]["resources"] = {} + func_yaml_content["options"]["resources"]["requests"] = {"memory": memory} + + with open(yaml_path, "w") as yaml_file: + yaml.dump(func_yaml_content, yaml_file, default_flow_style=False) + + def build_base_image( + self, + directory: str, + language_name: str, + language_version: str, + benchmark: str, + is_cached: bool, + ) -> bool: + """ + Build the base image for the function using the 'func build' command. + + Args: + - directory: Directory where the function code resides. + - language_name: Name of the programming language (e.g., Python). + - language_version: Version of the programming language. + - benchmark: Identifier for the benchmark or function. + - is_cached: Flag indicating if the code is cached. + + Returns: + - Boolean indicating if the image was built. + """ + + # Define the registry name + registry_name = self.config.resources.docker_registry + repository_name = self.system_config.docker_repository() + image_tag = self.system_config.benchmark_image_tag( + self.name(), benchmark, language_name, language_version + ) + + if registry_name: + repository_name = f"{registry_name}/{repository_name}" + else: + registry_name = "Docker Hub" + + if is_cached and self.find_image(repository_name, image_tag): + self.logging.info( + f"Skipping building Docker package for {benchmark}, using " + f"Docker image {repository_name}:{image_tag} from registry: " + f"{registry_name}." + ) + return False + else: + self.logging.info( + f"Image {repository_name}:{image_tag} doesn't exist in the registry, " + f"building Docker package for {benchmark}." + ) + + # Fetch the base image for the specified language and version + base_images = self.system_config.benchmark_base_images( + self.name(), language_name + ) + builder_image = base_images.get(language_version) + + # Construct the build command + build_command = [ + "func", + "build", + "--builder", + "s2i", + "--builder-image", + builder_image, + "--registry", + repository_name, + "--path", + directory, + "--image", + image_tag, + ] + + self.logging.info(f"Running build command: {' '.join(build_command)}") + + try: + result = subprocess.run( + build_command, + capture_output=True, + check=True, + ) + except subprocess.CalledProcessError as e: + self.logging.error(f"Error building the function: {e.stderr.decode()}") + raise RuntimeError(e) from e + + self.logging.info( + f"Successfully built function image {repository_name}:{image_tag} " + f"to registry: {registry_name}." + ) + return True + + def package_code( + self, + directory: str, + language_name: str, + language_version: str, + benchmark: str, + is_cached: bool, + ) -> Tuple[str, int]: + """ + Package code for Knative platform by building a Docker image. + + Args: + - directory: Directory where the function code resides. + - language_name: Name of the programming language (e.g., Python). + - language_version: Version of the programming language. + - benchmark: Identifier for the benchmark or function. + - is_cached: Flag indicating if the code is cached. + + Returns: + - Tuple containing the Docker image name (tag) and its size. + """ + + CONFIG_FILES = { + "python": [ + "func.py", + "func.yaml", + "Procfile", + "requirements.txt", + "app.sh", + ], + "nodejs": ["index.js", "func.yaml", "package.json"], + } + + # Sanitize the benchmark name for the image tag + sanitized_benchmark_name = self.sanitize_benchmark_name_for_knative(benchmark) + # Generate func.yaml + func_yaml_content = { + "specVersion": "0.36.0", + "name": sanitized_benchmark_name, + "runtime": "node" if language_name == "nodejs" else language_name, + "created": datetime.now().astimezone().isoformat(), + "run": { + "envs": [ + { + "name": "MINIO_STORAGE_CONNECTION_URL", + "value": self.config._resources.storage_config.address, + }, + { + "name": "MINIO_STORAGE_ACCESS_KEY", + "value": self.config._resources.storage_config.access_key, + }, + { + "name": "MINIO_STORAGE_SECRET_KEY", + "value": self.config._resources.storage_config.secret_key, + }, + ] + }, + } + + yaml_out = os.path.join(directory, "func.yaml") + with open(yaml_out, "w") as yaml_file: + yaml.dump(func_yaml_content, yaml_file, default_flow_style=False) + + # Create Procfile for Python runtime + if language_name == "python": + procfile_content = "web: python3 -m parliament ." + procfile_out = os.path.join(directory, "Procfile") + with open(procfile_out, "w") as procfile_file: + procfile_file.write(procfile_content) + + # Create an empty __init__.py file + init_file_out = os.path.join(directory, "__init__.py") + open(init_file_out, "a").close() + + # Determine the correct requirements.txt file + requirements_file = ( + f"requirements.txt.{language_version}" + if language_version in ["3.9", "3.8", "3.7", "3.6"] + else "requirements.txt" + ) + requirements_src = os.path.join(directory, requirements_file) + requirements_dst = os.path.join(directory, "requirements.txt") + + if os.path.exists(requirements_src): + with open(requirements_src, "r") as src_file: + requirements_content = src_file.read() + with open(requirements_dst, "a") as dst_file: + dst_file.write(requirements_content) + # Create app.sh file for Python runtime + app_sh_content = """#!/bin/sh + exec python -m parliament "$(dirname "$0")" + """ + app_sh_out = os.path.join(directory, "app.sh") + with open(app_sh_out, "w") as app_sh_file: + app_sh_file.write(app_sh_content) + + # Make app.sh executable + os.chmod(app_sh_out, 0o755) + + # Modify package.json for Node.js runtime to add the faas-js-runtime. + if language_name == "nodejs": + package_json_path = os.path.join(directory, "package.json") + if os.path.exists(package_json_path): + with open(package_json_path, "r+") as package_file: + package_data = json.load(package_file) + if "scripts" not in package_data: + package_data["scripts"] = {} + package_data["scripts"]["start"] = "faas-js-runtime ./index.js" + package_file.seek(0) + package_file.write(json.dumps(package_data, indent=2)) + package_file.truncate() + + package_config = CONFIG_FILES[language_name] + function_dir = os.path.join(directory, "function") + os.makedirs(function_dir) + # move all files to 'function' except func.py, func.yaml, Procfile + for file in os.listdir(directory): + if file not in package_config: + file = os.path.join(directory, file) + shutil.move(file, function_dir) + + self.build_base_image( + directory, language_name, language_version, benchmark, is_cached + ) + + code_size = Benchmark.directory_size(directory) + return directory, code_size + + def storage_arguments(self) -> List[str]: + storage = cast(KnativeMinio, self.get_storage()) + return [ + "-p", + "MINIO_STORAGE_SECRET_KEY", + storage.config.secret_key, + "-p", + "MINIO_STORAGE_ACCESS_KEY", + storage.config.access_key, + "-p", + "MINIO_STORAGE_CONNECTION_URL", + storage.config.address, + ] + + def create_function( + self, code_package: Benchmark, func_name: str + ) -> "KnativeFunction": + self.logging.info("Building Knative function") + try: + # Check if the function already exists + knative_func_command = subprocess.run( + [*self.get_knative_func_cmd(), "list"], + stderr=subprocess.DEVNULL, + stdout=subprocess.PIPE, + ) + sanitize_benchmark_name = self.sanitize_benchmark_name_for_knative( + code_package.benchmark + ) + + function_found = False + docker_image = "" + for line in knative_func_command.stdout.decode().split("\n"): + if line and sanitize_benchmark_name in line.split()[0]: + function_found = True + break + + function_cfg = KnativeFunctionConfig.from_benchmark(code_package) + function_cfg.storage = cast(KnativeMinio, self.get_storage()).config + + if function_found: + self.logging.info( + f"Benchmark function of {sanitize_benchmark_name} already exists." + ) + res = KnativeFunction( + sanitize_benchmark_name, + code_package.benchmark, + code_package.hash, + function_cfg, + ) + self.logging.info( + f"Retrieved existing Knative function {sanitize_benchmark_name}" + ) + self.update_function(res, code_package) + return res + + else: + try: + self.logging.info( + f"Deploying new Knative function {sanitize_benchmark_name}" + ) + try: + docker_image = self.system_config.benchmark_image_name( + self.name(), + code_package.benchmark, + code_package.language_name, + code_package.language_version, + ) + # Deploy the function + result = subprocess.run( + [ + "func", + "deploy", + "--path", + code_package.code_location, + ], + capture_output=True, + check=True, + ) + # Log the standard output + self.logging.info("Deployment succeeded:") + self.logging.info( + result.stdout.decode() + ) # Print the captured output + except subprocess.CalledProcessError as e: + # Log the standard error + self.logging.error("Deployment failed:") + self.logging.error(e.stderr.decode()) + + # Retrieve the function URL + describe_command = [ + *self.get_knative_func_cmd(), + "describe", + sanitize_benchmark_name, + "-o", + "url", + ] + result = subprocess.run( + describe_command, + capture_output=True, + check=True, + ) + function_url = result.stdout.decode().strip() + self.logging.info("Function deployment URL fetched successfully.") + + function_cfg.url = function_url + function_cfg.docker_image = docker_image + + # Create the function object + res = KnativeFunction( + sanitize_benchmark_name, + code_package.benchmark, + code_package.hash, + function_cfg, + ) + + # Add HTTP trigger with the function URL + trigger = LibraryTrigger( + sanitize_benchmark_name, self.get_knative_func_cmd() + ) + trigger.logging_handlers = self.logging_handlers + res.add_trigger(trigger) + + return res + + except subprocess.CalledProcessError as e: + self.logging.error( + f"Error deploying Knative function {sanitize_benchmark_name}." + ) + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) from e + + except FileNotFoundError: + self.logging.error( + "Could not retrieve Knative functions - is path to func correct?" + ) + raise RuntimeError("Failed to access func binary") + + def update_function(self, function: Function, code_package: Benchmark): + self.logging.info(f"Updating an existing Knative function {function.name}.") + function = cast(KnativeFunction, function) + docker_image = self.system_config.benchmark_image_name( + self.name(), + code_package.benchmark, + code_package.language_name, + code_package.language_version, + ) + + # Update func.yaml with resources before re-deployment + self.update_func_yaml_with_resources( + code_package.code_location, code_package.benchmark_config.memory + ) + + try: + subprocess.run( + [ + *self.get_knative_func_cmd(), + "deploy", + "--path", + code_package.code_location, + ], + capture_output=True, + check=True, + ) + function.config.docker_image = docker_image + + except FileNotFoundError as e: + self.logging.error( + "Could not update Knative function - is the 'func' CLI installed and configured correctly?" + ) + raise RuntimeError(e) from e + except subprocess.CalledProcessError as e: + self.logging.error(f"Unknown error when running function update: {e}!") + self.logging.error( + "Ensure the SeBS cache is cleared if there are issues with Knative!" + ) + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) + + def update_function_configuration( + self, function: Function, code_package: Benchmark + ): + self.logging.info( + f"Update configuration of an existing Knative function {function.name}." + ) + + self.update_func_yaml_with_resources( + code_package.code_location, code_package.benchmark_config.memory + ) + + try: + subprocess.run( + [ + "func", + "deploy", + "--path", + code_package.code_location, + "--push=false", + ], + capture_output=True, + check=True, + ) + + except FileNotFoundError as e: + self.logging.error( + "Could not update Knative function - is path to func correct?" + ) + raise RuntimeError(e) + except subprocess.CalledProcessError as e: + self.logging.error(f"Unknown error when running function update: {e}!") + self.logging.error( + "Make sure to remove SeBS cache after restarting Knative!" + ) + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) + + def create_trigger( + self, function: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: + if trigger_type == Trigger.TriggerType.LIBRARY: + return function.triggers(Trigger.TriggerType.LIBRARY)[0] + elif trigger_type == Trigger.TriggerType.HTTP: + try: + response = subprocess.run( + [ + *self.get_knative_func_cmd(), + "describe", + function.name, + "--output", + "url", + ], + capture_output=True, + check=True, + ) + except FileNotFoundError as e: + self.logging.error( + "Could not retrieve Knative function configuration - is the 'func' CLI installed and configured correctly?" + ) + raise RuntimeError(e) + stdout = response.stdout.decode("utf-8") + url = stdout.strip() + trigger = HTTPTrigger(function.name, url) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) + self.cache_client.update_function(function) + return trigger + else: + raise RuntimeError("Not supported!") + + def cached_function(self, function: Function): + for trigger in function.triggers(Trigger.TriggerType.LIBRARY): + trigger.logging_handlers = self.logging_handlers + for trigger in function.triggers(Trigger.TriggerType.HTTP): + trigger.logging_handlers = self.logging_handlers + + def default_function_name(self, code_package: Benchmark) -> str: + return ( + f"{code_package.benchmark}-{code_package.language_name}-" + f"{code_package.language_version}" + ) + + def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): + raise NotImplementedError() + + def download_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict[str, ExecutionResult], + metrics: dict, + ): + raise NotImplementedError() diff --git a/sebs/knative/storage.py b/sebs/knative/storage.py new file mode 100644 index 00000000..d96e12ff --- /dev/null +++ b/sebs/knative/storage.py @@ -0,0 +1,28 @@ +import docker +from sebs.faas.config import Resources +from sebs.storage import minio +from sebs.storage.config import MinioConfig +from sebs.cache import Cache + + +class KnativeMinio(minio.Minio): + @staticmethod + def deployment_name() -> str: + return "knative" + + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + res: Resources, + replace_existing: bool, + ): + super().__init__(docker_client, cache_client, res, replace_existing) + + @staticmethod + def deserialize( + cached_config: MinioConfig, cache_client: Cache, resources: Resources + ) -> "KnativeMinio": + return super(KnativeMinio, KnativeMinio)._deserialize( + cached_config, cache_client, resources, KnativeMinio + ) diff --git a/sebs/knative/triggers.py b/sebs/knative/triggers.py new file mode 100644 index 00000000..b7a78e53 --- /dev/null +++ b/sebs/knative/triggers.py @@ -0,0 +1,106 @@ +import concurrent.futures +import datetime +import json +import subprocess +from typing import List, Optional + +from sebs.faas.function import ExecutionResult, Trigger + + +class LibraryTrigger(Trigger): + def __init__(self, fname: str, func_cmd: Optional[List[str]] = None): + super().__init__() + self.fname = fname + if func_cmd: + self._func_cmd = [*func_cmd, ""] + + @staticmethod + def trigger_type() -> "Trigger.TriggerType": + return Trigger.TriggerType.LIBRARY + + @property + def func_cmd(self) -> List[str]: + assert self._func_cmd + return self._func_cmd + + @func_cmd.setter + def func_cmd(self, func_cmd: List[str]): + self._func_cmd = func_cmd + + def sync_invoke(self, payload: dict) -> ExecutionResult: + command = self.func_cmd + [ + "invoke", + self.fname, + "--data", + json.dumps(payload), + ] + error = None + try: + begin = datetime.datetime.now() + response = subprocess.run( + command, + capture_output=True, + check=True, + ) + end = datetime.datetime.now() + parsed_response = response.stdout.decode("utf-8") + except (subprocess.CalledProcessError, FileNotFoundError) as e: + end = datetime.datetime.now() + error = e + + knative_result = ExecutionResult.from_times(begin, end) + if error is not None: + self.logging.error(f"Invocation of {self.fname} failed!") + knative_result.stats.failure = True + return knative_result + + return_content = json.loads(parsed_response) + knative_result.parse_benchmark_output(return_content) + return knative_result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "Library", "name": self.fname} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return LibraryTrigger(obj["name"]) + + @staticmethod + def typename() -> str: + return "Knative.LibraryTrigger" + + +class HTTPTrigger(Trigger): + def __init__(self, fname: str, url: str): + super().__init__() + self.fname = fname + self.url = url + + @staticmethod + def typename() -> str: + return "Knative.HTTPTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.HTTP + + def sync_invoke(self, payload: dict) -> ExecutionResult: + self.logging.debug(f"Invoke function {self.url}") + return self._http_invoke(payload, self.url, False) + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "HTTP", "fname": self.fname, "url": self.url} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return HTTPTrigger(obj["fname"], obj["url"]) diff --git a/sebs/regression.py b/sebs/regression.py index 3084bc88..03e396fe 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -289,6 +289,44 @@ def get_deployment(self, benchmark_name): return deployment_client +class KnativeTestSequencePython( + unittest.TestCase, + metaclass=TestSequenceMeta, + benchmarks=benchmarks_python, + deployment_name="knative", + triggers=[Trigger.TriggerType.HTTP], +): + def get_deployment(self, benchmark_name): + deployment_name = "knative" + assert cloud_config + deployment_client = self.client.get_deployment( + cloud_config, + logging_filename=f"regression_{deployment_name}_{benchmark_name}.log", + ) + with KnativeTestSequencePython.lock: + deployment_client.initialize(resource_prefix="regression") + return deployment_client + + +class KnativeTestSequenceNodejs( + unittest.TestCase, + metaclass=TestSequenceMeta, + benchmarks=benchmarks_nodejs, + deployment_name="knative", + triggers=[Trigger.TriggerType.HTTP], +): + def get_deployment(self, benchmark_name): + deployment_name = "knative" + assert cloud_config + deployment_client = self.client.get_deployment( + cloud_config, + logging_filename=f"regression_{deployment_name}_{benchmark_name}.log", + ) + with KnativeTestSequenceNodejs.lock: + deployment_client.initialize(resource_prefix="regression") + return deployment_client + + # https://stackoverflow.com/questions/22484805/a-simple-working-example-for-testtools-concurrentstreamtestsuite class TracingStreamResult(testtools.StreamResult): all_correct: bool @@ -370,6 +408,16 @@ def regression_suite( suite.addTest( unittest.defaultTestLoader.loadTestsFromTestCase(OpenWhiskTestSequenceNodejs) ) + if "knative" in providers: + assert "knative" in cloud_config + if language == "python": + suite.addTest( + unittest.defaultTestLoader.loadTestsFromTestCase(KnativeTestSequencePython) + ) + elif language == "nodejs": + suite.addTest( + unittest.defaultTestLoader.loadTestsFromTestCase(KnativeTestSequenceNodejs) + ) tests = [] # mypy is confused here diff --git a/sebs/sebs.py b/sebs/sebs.py index 58bc07a9..a989c6d6 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -104,7 +104,12 @@ def get_deployment( from sebs.openwhisk import OpenWhisk implementations["openwhisk"] = OpenWhisk - + + if has_platform("knative"): + from sebs.knative import Knative + + implementations["knative"] = Knative + if name not in implementations: raise RuntimeError("Deployment {name} not supported!".format(name=name)) diff --git a/sebs/types.py b/sebs/types.py index 2f26117e..aafbac84 100644 --- a/sebs/types.py +++ b/sebs/types.py @@ -7,6 +7,7 @@ class Platforms(str, Enum): GCP = "gcp" LOCAL = "local" OPENWHISK = "openwhisk" + KNATIVE = "knative" class Storage(str, Enum): diff --git a/sebs/utils.py b/sebs/utils.py index 3df8ffc9..46ad74c1 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -6,6 +6,7 @@ import uuid import click import datetime +import re from typing import List, Optional @@ -86,8 +87,6 @@ def configure_logging(): for logger in loggers: if name.startswith(logger): logging.getLogger(name).setLevel(logging.ERROR) - - # def configure_logging(verbose: bool = False, output_dir: Optional[str] = None): # logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s" # logging_date_format = "%H:%M:%S" diff --git a/tools/knative_setup.sh b/tools/knative_setup.sh new file mode 100755 index 00000000..84cfce84 --- /dev/null +++ b/tools/knative_setup.sh @@ -0,0 +1,395 @@ +#!/usr/bin/env bash + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Allocate a Kind cluster with Knative, Kourier and a local container registry. +# + +init() { + find_executables + populate_environment + define_colors +} + +find_executables() { + KUBECTL=$(find_executable "kubectl") + KIND=$(find_executable "kind") + HELM=$(find_executable "helm") + JQ=$(find_executable "jq") +} + +populate_environment() { + export ARCH="${ARCH:-amd64}" + export CONTAINER_ENGINE=${CONTAINER_ENGINE:-docker} + export KUBECONFIG="${KUBECONFIG:-$(dirname "$(realpath "$0")")/bin/kubeconfig.yaml}" + export TERM="${TERM:-dumb}" + echo "KUBECONFIG=${KUBECONFIG}" +} + +define_colors() { + # For some reason TERM=dumb results in the tput commands exiting 1. It must + # not support that terminal type. A reasonable fallback should be "xterm". + local TERM="$TERM" + if [[ -z "$TERM" || "$TERM" == "dumb" ]]; then + TERM="xterm" # Set TERM to a tput-friendly value when undefined or "dumb". + fi + # shellcheck disable=SC2155 + red=$(tput bold)$(tput setaf 1) + # shellcheck disable=SC2155 + green=$(tput bold)$(tput setaf 2) + # shellcheck disable=SC2155 + blue=$(tput bold)$(tput setaf 4) + # shellcheck disable=SC2155 + grey=$(tput bold)$(tput setaf 8) + # shellcheck disable=SC2155 + yellow=$(tput bold)$(tput setaf 11) + # shellcheck disable=SC2155 + reset=$(tput sgr0) +} + +# find returns the path to an executable by name. +# An environment variable FUNC_TEST_$name takes precidence. +# Next is an executable matching the name in hack/bin/ +# (the install location of hack/install-binaries.sh) +# Finally, a matching executable from the current PATH is used. +find_executable() { + local name="$1" # requested binary name + local path="" # the path to output + + # Use the environment variable if defined + local env=$(echo "FUNC_TEST_$name" | awk '{print toupper($0)}') + local path="${!env:-}" + if [[ -x "$path" ]]; then + echo "$path" & + return 0 + fi + + # Use the binary installed into hack/bin/ by allocate.sh if + # it exists. + path=$(dirname "$(realpath "$0")")"/bin/$name" + if [[ -x "$path" ]]; then + echo "$path" & + return 0 + fi + + # Finally fallback to anything matchin in the current PATH + path=$(command -v "$name") + if [[ -x "$path" ]]; then + echo "$path" & + return 0 + fi + + echo "Error: ${name} not found." >&2 + return 1 +} + +set -o errexit +set -o nounset +set -o pipefail + +set_versions() { + # Note: Kubernetes Version node image per Kind releases (full hash is suggested): + # https://github.com/kubernetes-sigs/kind/releases + kind_node_version=v1.29.2@sha256:51a1434a5397193442f0be2a297b488b6c919ce8a3931be0ce822606ea5ca245 + knative_serving_version="v$(get_latest_release_version "knative" "serving")" + knative_eventing_version="v$(get_latest_release_version "knative" "eventing")" + contour_version="v$(get_latest_release_version "knative-extensions" "net-contour")" +} + +main() { + echo "${blue}Allocating${reset}" + + set_versions + kubernetes + loadbalancer + + echo "${blue}Beginning Cluster Configuration${reset}" + echo "Tasks will be executed in parallel. Logs will be prefixed:" + echo "svr: Serving, DNS and Networking" + echo "evt: Eventing and Namespace" + echo "reg: Local Registry" + echo "" + + ( + set -o pipefail + (serving && dns && networking) 2>&1 | sed -e 's/^/svr /' + ) & + ( + set -o pipefail + (eventing && namespace) 2>&1 | sed -e 's/^/evt /' + ) & + ( + set -o pipefail + registry 2>&1 | sed -e 's/^/reg /' + ) & + + local job + for job in $(jobs -p); do + wait "$job" + done + + next_steps + + echo -e "\n${green}🎉 DONE${reset}\n" +} + +# Retrieve latest version from given Knative repository tags +# On 'main' branch the latest released version is returned +# On 'release-x.y' branch the latest patch version for 'x.y.*' is returned +# Similar to hack/library.sh get_latest_knative_yaml_source() +function get_latest_release_version() { + local org_name="$1" + local repo_name="$2" + local major_minor="" + if is_release_branch; then + local branch_name + branch_name="$(current_branch)" + major_minor="${branch_name##release-}" + fi + local version + version="$(git ls-remote --tags --ref https://github.com/"${org_name}"/"${repo_name}".git | + grep "${major_minor}" | + cut -d '-' -f2 | + cut -d 'v' -f2 | + sort -Vr | + head -n 1)" + echo "${version}" +} + +# Returns whether the current branch is a release branch. +function is_release_branch() { + [[ $(current_branch) =~ ^release-[0-9\.]+$ ]] +} + +# Returns the current branch. +# Taken from knative/hack. The function covers Knative CI use cases and local variant. +function current_branch() { + local branch_name="" + # Get the branch name from Prow's env var, see https://github.com/kubernetes/test-infra/blob/master/prow/jobs.md. + # Otherwise, try getting the current branch from git. + ((${IS_PROW:-})) && branch_name="${PULL_BASE_REF:-}" + [[ -z "${branch_name}" ]] && branch_name="${GITHUB_BASE_REF:-}" + [[ -z "${branch_name}" ]] && branch_name="$(git rev-parse --abbrev-ref HEAD)" + echo "${branch_name}" +} + +kubernetes() { + cat <= n)); then + echo "Unable to set knative domain" + exit 1 + fi + echo 'Retrying...' + sleep 5 + done + echo "${green}✅ DNS${reset}" +} + +loadbalancer() { + echo "${blue}Installing Load Balancer (Metallb)${reset}" + $KUBECTL apply -f "https://raw.githubusercontent.com/metallb/metallb/v0.13.7/config/manifests/metallb-native.yaml" + sleep 5 + $KUBECTL wait --namespace metallb-system \ + --for=condition=ready pod \ + --selector=app=metallb \ + --timeout=300s + + local kind_addr + kind_addr="$($CONTAINER_ENGINE container inspect func-control-plane | jq '.[0].NetworkSettings.Networks.kind.IPAddress' -r)" + + echo "Setting up address pool." + $KUBECTL apply -f - <