Skip to content
This repository was archived by the owner on Jul 12, 2023. It is now read-only.
Open
51 changes: 49 additions & 2 deletions styx-cli-unshaded/src/main/java/com/spotify/styx/cli/CliMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -576,18 +576,28 @@ private void backfillList() throws ExecutionException, InterruptedException {
private void resourceCreate() throws ExecutionException, InterruptedException {
final String id = namespace.getString(parser.resourceCreateId.getDest());
final int concurrency = namespace.getInt(parser.resourceCreateConcurrency.getDest());
final String requestsMemory = namespace.getString(parser.resourceCreateRequestsMemory.getDest());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a Double and we default to G to reduce error proneness

final Double requestsCpu = namespace.getDouble(parser.resourceCreateRequestsCpu.getDest());
final String limitsMemory = namespace.getString(parser.resourceCreateLimitsMemory.getDest());
final Double limitsCpu = namespace.getDouble(parser.resourceCreateLimitsCpu.getDest());

final Resource resource =
styxClient.resourceCreate(id, concurrency).toCompletableFuture().get();
styxClient.resourceCreate(id, concurrency, requestsMemory, requestsCpu,
limitsMemory, limitsCpu).toCompletableFuture().get();
cliOutput.printResources(Collections.singletonList(resource));
}

private void resourceEdit() throws ExecutionException, InterruptedException {
final String id = namespace.getString(parser.resourceEditId.getDest());
final Integer concurrency = namespace.getInt(parser.resourceEditConcurrency.getDest());
final String requestsMemory = namespace.getString(parser.resourceEditRequestsMemory.getDest());
final Double requestsCpu = namespace.getDouble(parser.resourceEditRequestsCpu.getDest());
final String limitsMemory = namespace.getString(parser.resourceEditLimitsMemory.getDest());
final Double limitsCpu = namespace.getDouble(parser.resourceEditLimitsCpu.getDest());

if (concurrency != null) {
Resource resource = styxClient.resourceEdit(id, concurrency).toCompletableFuture().get();
Resource resource = styxClient.resourceEdit(id, concurrency,
requestsMemory, requestsCpu, limitsMemory, limitsCpu).toCompletableFuture().get();
cliOutput.printResources(Collections.singletonList(resource));
}
}
Expand Down Expand Up @@ -793,6 +803,22 @@ private static class StyxCliParser extends ParserBase {
resourceEdit.addArgument("--concurrency")
.help("set the concurrency value for the resource")
.type(Integer.class);
final Argument resourceEditRequestsMemory =
resourceEdit.addArgument("--requestsMemory")
.help("set the requests memory value for the resource")
.type(String.class);
final Argument resourceEditRequestsCpu =
resourceEdit.addArgument("--requestsCpu")
.help("set the requests cpu value for the resource")
.type(Double.class);
final Argument resourceEditLimitsMemory =
resourceEdit.addArgument("--limitsMemory")
.help("set the limits memory value for the resource")
.type(String.class);
final Argument resourceEditLimitsCpu =
resourceEdit.addArgument("--limitsCpu")
.help("set the limits cpu value for the resource")
.type(Double.class);

final Subparser resourceList = ResourceCommand.LIST.parser(resourceParser, cliContext);

Expand All @@ -803,6 +829,27 @@ private static class StyxCliParser extends ParserBase {
resourceCreate.addArgument("concurrency")
.help("The concurrency of this resource")
.type(Integer.class);
final Argument resourceCreateRequestsMemory =
resourceCreate.addArgument("--requestsMemory")
.help("The requests memory for this resource")
.type(String.class)
.required(false)/*
.setDefault(Optional.of("1Gi"))*/;
final Argument resourceCreateRequestsCpu =
resourceCreate.addArgument("--requestsCpu")
.help("The requests cpu for this resource")
.type(Double.class)
.required(false);
final Argument resourceCreateLimitsMemory =
resourceCreate.addArgument("--limitsMemory")
.help("The limits memory for this resource")
.type(String.class)
.required(false);
final Argument resourceCreateLimitsCpu =
resourceCreate.addArgument("--limitsCpu")
.help("The limits cpu for this resource")
.type(Double.class)
.required(false);

final Subparsers workflowParser =
Command.WORKFLOW.parser(subCommands, cliContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.spotify.styx.client.StyxClient;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.BackfillInput;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.TriggerParameters;
import com.spotify.styx.model.Workflow;
Expand Down Expand Up @@ -1019,6 +1020,34 @@ public void testDebugOptionIsGlobal(final String argLine) {
assertThat(Try.run(() -> CliMain.run(cliContext, argLine.split(" "))).isSuccess(), is(true));
}

@Test
public void testCreateResource() {
final String resourceId = "test-comp-id";
final int concurrency = 2;

when(client.resourceCreate(resourceId, concurrency, null, null, null, null))
.thenReturn(CompletableFuture.completedFuture(Resource.create(resourceId, concurrency)));

CliMain.run(cliContext, "resource", "create", resourceId, Integer.toString(concurrency));
verify(client).resourceCreate(resourceId, concurrency, null, null, null, null);
}

@Test
public void testCreateResourceWithMemoryAndCPU() {
final String resourceId = "test-comp-id";
final int concurrency = 2;

when(client.resourceCreate(resourceId, concurrency, "1Gi", 1D, "2Gi", 2D))
.thenReturn(CompletableFuture.completedFuture(Resource.create(resourceId, concurrency)));

CliMain.run(cliContext, "resource", "create", resourceId, Integer.toString(concurrency),
"--requestsMemory", "1Gi",
"--requestsCpu", "1",
"--limitsMemory", "2Gi",
"--limitsCpu", "2");
verify(client).resourceCreate(resourceId, concurrency, "1Gi", 1D, "2Gi", 2D);
}

@Test
@Parameters({
"--host https://foo.bar workflow ls",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,18 @@ public CompletionStage<Void> retryWorkflowInstance(String componentId,
}

@Override
public CompletionStage<Resource> resourceCreate(String resourceId, int concurrency) {
public CompletionStage<Resource> resourceCreate(String resourceId, int concurrency,
String requestsMemory, Double requestsCpu,
String limitsMemory, Double limitsCpu) {
final Resource resource = Resource.create(resourceId, concurrency);
return execute(forUri(urlBuilder("resources"), "POST", resource),
Resource.class);
}

@Override
public CompletionStage<Resource> resourceEdit(String resourceId, int concurrency) {
public CompletionStage<Resource> resourceEdit(String resourceId, int concurrency,
String requestsMemory, Double requestsCpu,
String limitsMemory, Double limitsCpu) {
final Resource resource = Resource.create(resourceId, concurrency);
return execute(forUri(urlBuilder("resources", resourceId), "PUT", resource),
Resource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ interface StyxResourceClient extends AutoCloseable {
* @return The created {@link Resource}
*/
CompletionStage<Resource> resourceCreate(String resourceId,
int concurrency);
int concurrency,
String requestsMemory, Double requestsCpu,
String limitsMemory, Double limitsCpu);

/**
* Edit an existing {@link Resource}
Expand All @@ -47,7 +49,9 @@ CompletionStage<Resource> resourceCreate(String resourceId,
* @return The updated{@link Resource}
*/
CompletionStage<Resource> resourceEdit(String resourceId,
int concurrency);
int concurrency,
String requestsMemory, Double requestsCpu,
String limitsMemory, Double limitsCpu);

/**
* Get an existing {@link Resource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
import com.spotify.styx.model.BackfillInput;
import com.spotify.styx.model.EditableBackfillInput;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.LimitsResource;
import com.spotify.styx.model.LimitsResourceBuilder;
import com.spotify.styx.model.RequestsResource;
import com.spotify.styx.model.RequestsResourceBuilder;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.TriggerParameters;
Expand Down Expand Up @@ -97,6 +101,9 @@
@RunWith(JUnitParamsRunner.class)
public class StyxOkHttpClientTest {

private static final LimitsResource LIMITS_RESOURCE = new LimitsResourceBuilder().cpu("1").memory("1Gi").build();
private static final RequestsResource REQUESTS_RESOURCE = new RequestsResourceBuilder().cpu("1").memory("1Gi").build();

private static final WorkflowConfiguration WORKFLOW_CONFIGURATION_1 = WorkflowConfiguration.builder()
.id("bar-wf_1")
.dockerImage("busybox")
Expand All @@ -109,6 +116,8 @@ public class StyxOkHttpClientTest {
.dockerImage("busybox")
.dockerArgs(Arrays.asList("echo", "hello world"))
.schedule(Schedule.DAYS)
.limits(LIMITS_RESOURCE)
.requests(REQUESTS_RESOURCE)
.build();

private static final Workflow WORKFLOW_1 = Workflow.create("f[ ]o-cmp", WORKFLOW_CONFIGURATION_1);
Expand Down Expand Up @@ -320,7 +329,8 @@ public void shouldEditResource() throws Exception {
when(client.send(any(Request.class)))
.thenReturn(CompletableFuture.completedFuture(response(HTTP_OK, resource)));
final CompletableFuture<Resource> r =
styx.resourceEdit("resource", 3).toCompletableFuture();
styx.resourceEdit("resource", 3,
"1Gi", 1D, "2Gi", 2D).toCompletableFuture();
verify(client, timeout(30_000)).send(requestCaptor.capture());
assertThat(r.isDone(), is(true));
final Request request = requestCaptor.getValue();
Expand All @@ -338,7 +348,7 @@ public void shouldCreateResource() throws Exception {
.thenReturn(CompletableFuture.completedFuture(response(HTTP_OK,
resource)));
final CompletableFuture<Resource> r =
styx.resourceCreate("resource", 3).toCompletableFuture();
styx.resourceCreate("resource", 3, "1Gi", 1D, "2Gi", 2D).toCompletableFuture();
verify(client, timeout(30_000)).send(requestCaptor.capture());
assertThat(r.isDone(), is(true));
final Request request = requestCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface ExecutionDescription {
Optional<String> retryCondition();
Optional<FlyteExecConf> flyteExecConf();
Optional<String> flyteExecutionId();
Optional<RequestsResource> requests();
Optional<LimitsResource> limits();

static ExecutionDescriptionBuilder builder() {
return new ExecutionDescriptionBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*-
* -\-\-
* Spotify Styx Common
* --
* Copyright (C) 2016 - 2022 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.styx.model;

import io.norberg.automatter.AutoMatter;
import java.util.Optional;

@AutoMatter
public interface LimitsResource {

Optional<String> memory();

Optional<String> cpu();

static LimitsResourceBuilder builder() {
return new LimitsResourceBuilder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*-
* -\-\-
* Spotify Styx Common
* --
* Copyright (C) 2016 - 2022 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.styx.model;

import io.norberg.automatter.AutoMatter;
import java.util.Optional;

@AutoMatter
public interface RequestsResource {

Optional<String> memory();

Optional<String> cpu();

static RequestsResourceBuilder builder() {
return new RequestsResourceBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public interface WorkflowConfiguration {

Optional<List<String>> dockerArgs();

Optional<RequestsResource> requests();

Optional<LimitsResource> limits();

/**
* Toggles behavior to reliably report exit status from the Docker container, via
* https://kubernetes.io/docs/tasks/debug-application-cluster/determine-reason-pod-failure/#writing-and-reading-a-termination-message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ interface RunSpec {

Optional<String> memLimit();

Optional<String> cpuRequest();

Optional<String> cpuLimit();

Map<String, String> env();

static RunSpecBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ static Pod createPod(WorkflowInstance workflowInstance,

final ResourceRequirementsBuilder resourceRequirements = new ResourceRequirementsBuilder();
runSpec.memRequest().ifPresent(s -> resourceRequirements.addToRequests("memory", new Quantity(s)));
runSpec.cpuRequest().ifPresent(s -> resourceRequirements.addToRequests("cpu", new Quantity(s)));
runSpec.memLimit().ifPresent(s -> resourceRequirements.addToLimits("memory", new Quantity(s)));
runSpec.cpuLimit().ifPresent(s -> resourceRequirements.addToLimits("cpu", new Quantity(s)));

final ContainerBuilder mainContainerBuilder = new ContainerBuilder()
.withName(MAIN_CONTAINER_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.spotify.styx.docker.InvalidExecutionException;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.LimitsResource;
import com.spotify.styx.model.RequestsResource;
import com.spotify.styx.state.EventRouter;
import com.spotify.styx.state.OutputHandler;
import com.spotify.styx.state.RunState;
Expand Down Expand Up @@ -114,6 +116,14 @@ private RunSpec createRunSpec(RunState state) {
.serviceAccount(executionDescription.serviceAccount())
.trigger(state.data().trigger())
.commitSha(state.data().executionDescription().flatMap(ExecutionDescription::commitSha))
.memRequest(state.data().executionDescription().flatMap(ExecutionDescription::requests)
.flatMap(RequestsResource::memory))
.memLimit(state.data().executionDescription().flatMap(ExecutionDescription::limits)
.flatMap(LimitsResource::memory))
.cpuRequest(state.data().executionDescription().flatMap(ExecutionDescription::requests)
.flatMap(RequestsResource::cpu))
.cpuLimit(state.data().executionDescription().flatMap(ExecutionDescription::limits)
.flatMap(LimitsResource::cpu))
.env(executionDescription.env())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.spotify.styx.model.Event;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.FlyteExecConf;
import com.spotify.styx.model.LimitsResource;
import com.spotify.styx.model.RequestsResource;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.state.EventRouter;
import com.spotify.styx.state.OutputHandler;
Expand Down Expand Up @@ -143,6 +145,8 @@ private ExecutionDescription getExecDescription(WorkflowInstance workflowInstanc
final Optional<String> dockerImage = workflow.configuration().dockerImage();
final List<String> dockerArgs = workflow.configuration().dockerArgs()
.orElse(Collections.emptyList());
final Optional<RequestsResource> requests = workflow.configuration().requests();
final Optional<LimitsResource> limits = workflow.configuration().limits();
final List<String> command = argsReplace(dockerArgs, workflowInstance.parameter());
final Optional<FlyteExecConf> flyteExecConf = workflow.configuration().flyteExecConf();

Expand All @@ -162,6 +166,8 @@ private ExecutionDescription getExecDescription(WorkflowInstance workflowInstanc
.retryCondition(workflow.configuration().retryCondition())
.flyteExecConf(flyteExecConf)
.flyteExecutionId(Optional.of(flyteExecutionId))
.requests(requests)
.limits(limits)
.build();
}

Expand Down
Loading