Skip to content

Commit

Permalink
Add cache to ContainerTask
Browse files Browse the repository at this point in the history
By introducing a common super interface to RunnableTask and ContainerTask

Signed-off-by: Nelson Arapé <[email protected]>
  • Loading branch information
narape committed Jul 27, 2022
1 parent 84aca81 commit 9047492
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 137 deletions.
18 changes: 2 additions & 16 deletions flytekit-api/src/main/java/org/flyte/api/v1/ContainerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@
*/
package org.flyte.api.v1;

import static java.util.Collections.emptyMap;

import java.util.List;

/** Building block for tasks that execute arbitrary containers. */
public interface ContainerTask {

/** Specifies task name. */
String getName();
public interface ContainerTask extends Task {

/** Specifies container image. */
String getImage();
Expand All @@ -38,22 +33,13 @@ public interface ContainerTask {
/** Specifies container environment variables. */
List<KeyValuePair> getEnv();

@Override
default String getType() {
return "raw-container";
}

TypedInterface getInterface();

/** Specifies container resource requests. */
default Resources getResources() {
return Resources.builder().build();
}

/** Specifies task retry policy. */
RetryStrategy getRetries();

/** Specifies custom container parameters. */
default Struct getCustom() {
return Struct.of(emptyMap());
}
}
22 changes: 3 additions & 19 deletions flytekit-api/src/main/java/org/flyte/api/v1/RunnableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import java.util.Map;

/** Building block for tasks that execute Java code. */
public interface RunnableTask {

String getName();
public interface RunnableTask extends Task {

@Override
default String getType() {
// FIXME default only for backwards-compatibility, remove in 0.3.x
return "java-task";
}

@Override
default Struct getCustom() {
// FIXME default only for backwards-compatibility, remove in 0.3.x
return Struct.of(emptyMap());
Expand All @@ -40,21 +40,5 @@ default Resources getResources() {
return Resources.builder().build();
}

TypedInterface getInterface();

Map<String, Literal> run(Map<String, Literal> inputs);

RetryStrategy getRetries();

default boolean isCached() {
return false;
}

default String getCacheVersion() {
return null;
}

default boolean isCacheSerializable() {
return false;
}
}
61 changes: 61 additions & 0 deletions flytekit-api/src/main/java/org/flyte/api/v1/Task.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2021 Flyte Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.api.v1;

import static java.util.Collections.emptyMap;

/** Super interfaces for all tasks. */
public interface Task {

/** Specifies task name. */
String getName();

/** Specifies the task type identifier. */
String getType();

/** Specifies the task interface: inputs/outputs. */
TypedInterface getInterface();

/** Specifies custom data about the task. */
default Struct getCustom() {
return Struct.of(emptyMap());
}

/** Specifies task retry policy. */
RetryStrategy getRetries();

/**
* Indicates whether the system should attempt to lookup this task's output to avoid duplication
* of work.
*/
default boolean isCached() {
return false;
}

/** Indicates a logical version to apply to this task for the purpose of cache. */
default String getCacheVersion() {
return null;
}

/**
* Indicates whether the system should attempt to execute cached instances in serial to avoid
* duplicate work.
*/
default boolean isCacheSerializable() {
return false;
}
}
41 changes: 20 additions & 21 deletions jflyte/src/main/java/org/flyte/jflyte/ProjectClosure.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.flyte.api.v1.RunnableTask;
import org.flyte.api.v1.RunnableTaskRegistrar;
import org.flyte.api.v1.Struct;
import org.flyte.api.v1.Task;
import org.flyte.api.v1.TaskIdentifier;
import org.flyte.api.v1.TaskTemplate;
import org.flyte.api.v1.WorkflowIdentifier;
Expand Down Expand Up @@ -451,6 +452,25 @@ static TaskTemplate createTaskTemplateForRunnableTask(RunnableTask task, String
.resources(resources)
.build();

return createTaskTemplate(task, container);
}

@VisibleForTesting
static TaskTemplate createTaskTemplateForContainerTask(ContainerTask task) {
Resources resources = task.getResources();
Container container =
Container.builder()
.command(task.getCommand())
.args(task.getArgs())
.image(task.getImage())
.env(task.getEnv())
.resources(resources)
.build();

return createTaskTemplate(task, container);
}

private static TaskTemplate createTaskTemplate(Task task, Container container) {
TaskTemplate.Builder templateBuilder =
TaskTemplate.builder()
.container(container)
Expand All @@ -468,27 +488,6 @@ static TaskTemplate createTaskTemplateForRunnableTask(RunnableTask task, String
return templateBuilder.build();
}

@VisibleForTesting
static TaskTemplate createTaskTemplateForContainerTask(ContainerTask task) {
Resources resources = task.getResources();
Container container =
Container.builder()
.command(task.getCommand())
.args(task.getArgs())
.image(task.getImage())
.env(task.getEnv())
.resources(resources)
.build();

return TaskTemplate.builder()
.container(container)
.interface_(task.getInterface())
.retries(task.getRetries())
.type(task.getType())
.custom(task.getCustom())
.build();
}

private static Optional<KeyValuePair> javaToolOptionsEnv(Resources resources) {
Map<ResourceName, String> limits = resources.limits();
if (limits == null || !limits.containsKey(ResourceName.MEMORY)) {
Expand Down
Loading

0 comments on commit 9047492

Please sign in to comment.