From 8b95d489c0742ce6f8e66ea0bb9e4f8565c1ac46 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Wed, 27 Sep 2023 23:21:01 +0200 Subject: [PATCH 1/9] Support remote in dynamic Signed-off-by: Hongxin Liang --- .../flyte/jflyte/utils/IdentifierRewrite.java | 21 +++-- .../flyte/jflyte/ExecuteDynamicWorkflow.java | 79 +++---------------- 2 files changed, 20 insertions(+), 80 deletions(-) diff --git a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java index 1ae75a7ec..4be296a71 100644 --- a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java +++ b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/IdentifierRewrite.java @@ -33,7 +33,7 @@ /** Overrides project, domain and version for nodes in {@link WorkflowTemplate}. */ @AutoValue -abstract class IdentifierRewrite { +public abstract class IdentifierRewrite { abstract String domain(); @@ -47,12 +47,11 @@ WorkflowTemplate apply(WorkflowTemplate template) { return visitor().visitWorkflowTemplate(template); } - @VisibleForTesting - Visitor visitor() { + public Visitor visitor() { return new Visitor(); } - class Visitor extends WorkflowNodeVisitor { + public class Visitor extends WorkflowNodeVisitor { @Override protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) { return apply(value); @@ -201,20 +200,20 @@ private static T coalesce(T value1, Supplier value2) { return value1 != null ? value1 : value2.get(); } - static Builder builder() { + public static Builder builder() { return new AutoValue_IdentifierRewrite.Builder(); } @AutoValue.Builder - abstract static class Builder { - abstract Builder domain(String domain); + public abstract static class Builder { + public abstract Builder domain(String domain); - abstract Builder project(String project); + public abstract Builder project(String project); - abstract Builder version(String version); + public abstract Builder version(String version); - abstract Builder adminClient(FlyteAdminClient adminClient); + public abstract Builder adminClient(FlyteAdminClient adminClient); - abstract IdentifierRewrite build(); + public abstract IdentifierRewrite build(); } } diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 96db3a1d7..6367fce41 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -41,9 +41,6 @@ import org.flyte.api.v1.DynamicWorkflowTaskRegistrar; import org.flyte.api.v1.Literal; import org.flyte.api.v1.Node; -import org.flyte.api.v1.PartialLaunchPlanIdentifier; -import org.flyte.api.v1.PartialTaskIdentifier; -import org.flyte.api.v1.PartialWorkflowIdentifier; import org.flyte.api.v1.RunnableTask; import org.flyte.api.v1.RunnableTaskRegistrar; import org.flyte.api.v1.Struct; @@ -57,6 +54,7 @@ import org.flyte.jflyte.utils.Config; import org.flyte.jflyte.utils.ExecutionConfig; import org.flyte.jflyte.utils.FileSystemLoader; +import org.flyte.jflyte.utils.IdentifierRewrite; import org.flyte.jflyte.utils.JFlyteCustom; import org.flyte.jflyte.utils.PackageLoader; import org.flyte.jflyte.utils.ProjectClosure; @@ -203,10 +201,16 @@ static DynamicJobSpec rewrite( Map taskTemplates, Map workflowTemplates) { - DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config); + WorkflowNodeVisitor workflowNodeVisitor = + IdentifierRewrite.builder() + .domain(config.domain()) + .project(config.project()) + .version(config.version()) + .build() + .visitor(); List rewrittenNodes = - spec.nodes().stream().map(rewrite::visitNode).collect(toUnmodifiableList()); + spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); Map usedSubWorkflows = ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); @@ -218,7 +222,7 @@ static DynamicJobSpec rewrite( // and workflows Map rewrittenUsedSubWorkflows = - mapValues(usedSubWorkflows, rewrite::visitWorkflowTemplate); + mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); return spec.toBuilder() .nodes(rewrittenNodes) @@ -235,69 +239,6 @@ static DynamicJobSpec rewrite( .build(); } - static class DynamicWorkflowIdentifierRewrite extends WorkflowNodeVisitor { - private final ExecutionConfig config; - - DynamicWorkflowIdentifierRewrite(ExecutionConfig config) { - this.config = config; - } - - @Override - protected PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialTaskIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote tasks: " + value); - } - - @Override - protected PartialWorkflowIdentifier visitWorkflowIdentifier(PartialWorkflowIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialWorkflowIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // in these cases all referenced workflows are sub-workflows, and we can't include - // templates for tasks used in them - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote workflows: " + value); - } - - @Override - protected PartialLaunchPlanIdentifier visitLaunchPlanIdentifier( - PartialLaunchPlanIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialLaunchPlanIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // we don't need to fetch anything, so we can use this reference, because - // for launch plans we don't need to include task and workflow templates into closure - if (value.project() != null && value.domain() != null && value.version() != null) { - return value; - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote launch plans: " + value); - } - } - private static DynamicWorkflowTask getDynamicWorkflowTask(String name) { // be careful not to pass extra Map env = getEnv(); From fc22187c468f4cd6c49409d20c3f223e613bebd7 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Wed, 27 Sep 2023 23:58:16 +0200 Subject: [PATCH 2/9] Create flyteadmin client Signed-off-by: Hongxin Liang --- .../java/org/flyte/jflyte/utils/ExecutionConfig.java | 10 ++++++++++ .../java/org/flyte/jflyte/ExecuteDynamicWorkflow.java | 3 +++ 2 files changed, 13 insertions(+) diff --git a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java index 7855eb696..63a6300fe 100644 --- a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java +++ b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java @@ -31,12 +31,18 @@ public abstract class ExecutionConfig { public abstract String version(); + public abstract String platformUrl(); + + public abstract boolean platformInsecure(); + public static ExecutionConfig load() { return ExecutionConfig.builder() .project(getenv("FLYTE_INTERNAL_PROJECT")) .domain(getenv("FLYTE_INTERNAL_DOMAIN")) .version(getenv("FLYTE_INTERNAL_VERSION")) .image(getenv("FLYTE_INTERNAL_IMAGE")) + .platformUrl(getenv("FLYTE_PLATFORM_URL")) + .platformInsecure(Boolean.parseBoolean(getenv("FLYTE_PLATFORM_INSECURE"))) .build(); } @@ -55,6 +61,10 @@ public abstract static class Builder { public abstract Builder image(String image); + public abstract Builder platformUrl(String platformUrl); + + public abstract Builder platformInsecure(boolean platformInsecure); + public abstract ExecutionConfig build(); } } diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 6367fce41..64bbe7ec0 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -54,6 +54,7 @@ import org.flyte.jflyte.utils.Config; import org.flyte.jflyte.utils.ExecutionConfig; import org.flyte.jflyte.utils.FileSystemLoader; +import org.flyte.jflyte.utils.FlyteAdminClient; import org.flyte.jflyte.utils.IdentifierRewrite; import org.flyte.jflyte.utils.JFlyteCustom; import org.flyte.jflyte.utils.PackageLoader; @@ -206,6 +207,8 @@ static DynamicJobSpec rewrite( .domain(config.domain()) .project(config.project()) .version(config.version()) + .adminClient( + FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) .build() .visitor(); From ba3abc43159bd25e03c15751bef84b653768c214 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 08:46:49 +0200 Subject: [PATCH 3/9] Use Config Signed-off-by: Hongxin Liang --- .../test/java/org/flyte/utils/JFlyteContainer.java | 2 -- .../java/org/flyte/jflyte/utils/ExecutionConfig.java | 11 +---------- .../java/org/flyte/jflyte/ExecuteDynamicWorkflow.java | 11 ++++++----- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java b/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java index 376da5e2f..703d37d5e 100644 --- a/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java +++ b/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java @@ -30,8 +30,6 @@ class JFlyteContainer extends GenericContainer { static final String IMAGE_NAME; static final Map envVars = ImmutableMap.builder() - .put("FLYTE_PLATFORM_URL", "flyte:30081") - .put("FLYTE_PLATFORM_INSECURE", "True") .put("FLYTE_AWS_ENDPOINT", "http://flyte:30084") .put("FLYTE_AWS_ACCESS_KEY_ID", "minio") .put("FLYTE_AWS_SECRET_ACCESS_KEY", "miniostorage") diff --git a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java index 63a6300fe..34d4a5072 100644 --- a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java +++ b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java @@ -19,6 +19,7 @@ import static java.lang.System.getenv; import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; /** Configuration values available only during task execution. */ @AutoValue @@ -31,18 +32,12 @@ public abstract class ExecutionConfig { public abstract String version(); - public abstract String platformUrl(); - - public abstract boolean platformInsecure(); - public static ExecutionConfig load() { return ExecutionConfig.builder() .project(getenv("FLYTE_INTERNAL_PROJECT")) .domain(getenv("FLYTE_INTERNAL_DOMAIN")) .version(getenv("FLYTE_INTERNAL_VERSION")) .image(getenv("FLYTE_INTERNAL_IMAGE")) - .platformUrl(getenv("FLYTE_PLATFORM_URL")) - .platformInsecure(Boolean.parseBoolean(getenv("FLYTE_PLATFORM_INSECURE"))) .build(); } @@ -61,10 +56,6 @@ public abstract static class Builder { public abstract Builder image(String image); - public abstract Builder platformUrl(String platformUrl); - - public abstract Builder platformInsecure(boolean platformInsecure); - public abstract ExecutionConfig build(); } } diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 64bbe7ec0..4973754e6 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -176,7 +176,7 @@ private void execute() { }); DynamicJobSpec rewrittenFutures = - rewrite(executionConfig, futures, taskTemplates, workflowTemplates); + rewrite(config, executionConfig, futures, taskTemplates, workflowTemplates); if (rewrittenFutures.nodes().isEmpty()) { Map outputs = getLiteralMap(rewrittenFutures.outputs()); @@ -197,16 +197,17 @@ private void execute() { } static DynamicJobSpec rewrite( - ExecutionConfig config, + Config config, + ExecutionConfig executionConfig, DynamicJobSpec spec, Map taskTemplates, Map workflowTemplates) { WorkflowNodeVisitor workflowNodeVisitor = IdentifierRewrite.builder() - .domain(config.domain()) - .project(config.project()) - .version(config.version()) + .domain(executionConfig.domain()) + .project(executionConfig.project()) + .version(executionConfig.version()) .adminClient( FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) .build() From 04fea71fdb53425a30647cd38559de45d7dc7f40 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 08:49:11 +0200 Subject: [PATCH 4/9] Fix import Signed-off-by: Hongxin Liang --- .../src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java index 34d4a5072..7855eb696 100644 --- a/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java +++ b/jflyte-utils/src/main/java/org/flyte/jflyte/utils/ExecutionConfig.java @@ -19,7 +19,6 @@ import static java.lang.System.getenv; import com.google.auto.value.AutoValue; -import javax.annotation.Nullable; /** Configuration values available only during task execution. */ @AutoValue From 4f52b362444071d77f6706b56d4013906f5befcf Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 11:14:15 +0200 Subject: [PATCH 5/9] Integration test it Signed-off-by: Hongxin Liang --- .../DynamicFibonacciWorkflowTask.java | 20 +++++++++++++- .../org/flyte/examples/FlyteEnvironment.java | 27 +++++++++++++++++++ .../org/flyte/utils/FlyteSandboxClient.java | 5 ++-- .../flyte/utils/FlyteSandboxContainer.java | 2 +- .../java/org/flyte/utils/JFlyteContainer.java | 2 ++ jflyte/pom.xml | 2 +- 6 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java diff --git a/flytekit-examples/src/main/java/org/flyte/examples/DynamicFibonacciWorkflowTask.java b/flytekit-examples/src/main/java/org/flyte/examples/DynamicFibonacciWorkflowTask.java index 930ff44da..9a6b98acc 100644 --- a/flytekit-examples/src/main/java/org/flyte/examples/DynamicFibonacciWorkflowTask.java +++ b/flytekit-examples/src/main/java/org/flyte/examples/DynamicFibonacciWorkflowTask.java @@ -16,6 +16,9 @@ */ package org.flyte.examples; +import static org.flyte.examples.FlyteEnvironment.DOMAIN; +import static org.flyte.examples.FlyteEnvironment.PROJECT; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.errorprone.annotations.Var; @@ -23,6 +26,9 @@ import org.flyte.flytekit.SdkBindingData; import org.flyte.flytekit.SdkBindingDataFactory; import org.flyte.flytekit.SdkDynamicWorkflowTask; +import org.flyte.flytekit.SdkNode; +import org.flyte.flytekit.SdkRemoteTask; +import org.flyte.flytekit.SdkTypes; import org.flyte.flytekit.SdkWorkflowBuilder; import org.flyte.flytekit.jackson.JacksonSdkType; @@ -59,11 +65,23 @@ public Output run(SdkWorkflowBuilder builder, Input input) { } else if (input.n().get() == 0) { return Output.create(SdkBindingDataFactory.of(0)); } else { + SdkNode hello = + builder.apply( + "hello", + SdkRemoteTask.create( + DOMAIN, + PROJECT, + HelloWorldTask.class.getName(), + SdkTypes.nulls(), + SdkTypes.nulls())); @Var SdkBindingData prev = SdkBindingDataFactory.of(0); @Var SdkBindingData value = SdkBindingDataFactory.of(1); for (int i = 2; i <= input.n().get(); i++) { SdkBindingData next = - builder.apply("fib-" + i, new SumTask(), SumInput.create(value, prev)).getOutputs(); + builder + .apply( + "fib-" + i, new SumTask().withUpstreamNode(hello), SumInput.create(value, prev)) + .getOutputs(); prev = value; value = next; } diff --git a/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java b/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java new file mode 100644 index 000000000..67fa76916 --- /dev/null +++ b/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 Flyte Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.examples; + +public final class FlyteEnvironment { + + public static final String DOMAIN = "development"; + public static final String PROJECT = "flytesnacks"; + + private FlyteEnvironment() { + throw new UnsupportedOperationException(); + } +} diff --git a/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxClient.java b/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxClient.java index 025a280c7..f024918c9 100644 --- a/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxClient.java +++ b/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxClient.java @@ -16,6 +16,9 @@ */ package org.flyte.utils; +import static org.flyte.examples.FlyteEnvironment.DOMAIN; +import static org.flyte.examples.FlyteEnvironment.PROJECT; + import flyteidl.admin.ExecutionOuterClass; import flyteidl.core.Execution; import flyteidl.core.IdentifierOuterClass; @@ -28,8 +31,6 @@ import org.rnorth.ducttape.unreliables.Unreliables; public class FlyteSandboxClient { - private static final String DOMAIN = "development"; - private static final String PROJECT = "flytesnacks"; private static final int EXECUTION_TIMEOUT_SECONDS = 300; private final String version; diff --git a/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxContainer.java b/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxContainer.java index 7a4a78b60..d4c930320 100644 --- a/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxContainer.java +++ b/integration-tests/src/test/java/org/flyte/utils/FlyteSandboxContainer.java @@ -33,7 +33,7 @@ public class FlyteSandboxContainer extends GenericContainer { - public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.1.0"; + public static final String IMAGE_NAME = "ghcr.io/flyteorg/flyte-sandbox:v1.9.1"; public static final FlyteSandboxContainer INSTANCE = new FlyteSandboxContainer() diff --git a/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java b/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java index 703d37d5e..376da5e2f 100644 --- a/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java +++ b/integration-tests/src/test/java/org/flyte/utils/JFlyteContainer.java @@ -30,6 +30,8 @@ class JFlyteContainer extends GenericContainer { static final String IMAGE_NAME; static final Map envVars = ImmutableMap.builder() + .put("FLYTE_PLATFORM_URL", "flyte:30081") + .put("FLYTE_PLATFORM_INSECURE", "True") .put("FLYTE_AWS_ENDPOINT", "http://flyte:30084") .put("FLYTE_AWS_ACCESS_KEY_ID", "minio") .put("FLYTE_AWS_SECRET_ACCESS_KEY", "miniostorage") diff --git a/jflyte/pom.xml b/jflyte/pom.xml index 4ca905964..2204c66e9 100644 --- a/jflyte/pom.xml +++ b/jflyte/pom.xml @@ -199,7 +199,7 @@ /jflyte/modules ${docker.image}:${docker.tag} - flyte:30081 + flyteadmin.flyte.svc.cluster.local:81 True From eb0af741515e9d5b1f342d9be91f2e1c12ea7ed7 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 11:45:34 +0200 Subject: [PATCH 6/9] 2023 Signed-off-by: Hongxin Liang --- .../src/main/java/org/flyte/examples/FlyteEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java b/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java index 67fa76916..25ebe4182 100644 --- a/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java +++ b/flytekit-examples/src/main/java/org/flyte/examples/FlyteEnvironment.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Flyte Authors + * Copyright 2023 Flyte Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 848dc42d72955a6774fe178794f049df516ce936 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 14:44:18 +0200 Subject: [PATCH 7/9] Support testing dynamic workflow task Signed-off-by: Hongxin Liang --- .../java/org/flyte/examples/UberWorkflow.java | 10 +++++- .../java/org/flyte/examples/WorkflowTest.java | 11 +++++-- .../flytekit/testing/SdkTestingExecutor.java | 32 ++++++++++++++++--- pom.xml | 12 +++---- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/flytekit-examples/src/main/java/org/flyte/examples/UberWorkflow.java b/flytekit-examples/src/main/java/org/flyte/examples/UberWorkflow.java index 737a01d5a..951f5a7e8 100644 --- a/flytekit-examples/src/main/java/org/flyte/examples/UberWorkflow.java +++ b/flytekit-examples/src/main/java/org/flyte/examples/UberWorkflow.java @@ -68,6 +68,14 @@ public SumWorkflow.Output expand(SdkWorkflowBuilder builder, Input input) { .result(); SdkBindingData abcd = builder.apply("post-sum", new SumTask(), SumTask.SumInput.create(abc, d)).getOutputs(); - return SumWorkflow.Output.create(abcd); + SdkBindingData result = + builder + .apply( + "fibonacci", + new DynamicFibonacciWorkflowTask(), + DynamicFibonacciWorkflowTask.Input.create(abcd)) + .getOutputs() + .output(); + return SumWorkflow.Output.create(result); } } diff --git a/flytekit-examples/src/test/java/org/flyte/examples/WorkflowTest.java b/flytekit-examples/src/test/java/org/flyte/examples/WorkflowTest.java index b79a68ef9..d298d60cd 100644 --- a/flytekit-examples/src/test/java/org/flyte/examples/WorkflowTest.java +++ b/flytekit-examples/src/test/java/org/flyte/examples/WorkflowTest.java @@ -54,9 +54,13 @@ public void testMockTasks() { new SumTask(), SumTask.SumInput.create(SdkBindingDataFactory.of(0L), SdkBindingDataFactory.of(4L)), SdkBindingDataFactory.of(42L)) + .withTaskOutput( + new DynamicFibonacciWorkflowTask(), + DynamicFibonacciWorkflowTask.Input.create(SdkBindingDataFactory.of(42L)), + DynamicFibonacciWorkflowTask.Output.create(SdkBindingDataFactory.of(123L))) .execute(); - assertEquals(42L, result.getIntegerOutput("result")); + assertEquals(123L, result.getIntegerOutput("result")); } @Test @@ -87,9 +91,12 @@ public void testMockSubWorkflow() { new SumTask(), SumInput.create(SdkBindingDataFactory.of(10L), SdkBindingDataFactory.of(4L)), SdkBindingDataFactory.of(15L)) + .withTask( + new DynamicFibonacciWorkflowTask(), + input -> DynamicFibonacciWorkflowTask.Output.create(SdkBindingDataFactory.of(42L))) .execute(); - assertEquals(15L, result.getIntegerOutput("result")); + assertEquals(42L, result.getIntegerOutput("result")); } @Test diff --git a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java index b187ff77f..1f1996fac 100644 --- a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java +++ b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java @@ -42,9 +42,11 @@ import org.flyte.api.v1.WorkflowNode; import org.flyte.api.v1.WorkflowNode.Reference; import org.flyte.api.v1.WorkflowTemplate; +import org.flyte.flytekit.SdkDynamicWorkflowTask; import org.flyte.flytekit.SdkRemoteLaunchPlan; import org.flyte.flytekit.SdkRemoteTask; import org.flyte.flytekit.SdkRunnableTask; +import org.flyte.flytekit.SdkTransform; import org.flyte.flytekit.SdkType; import org.flyte.flytekit.SdkWorkflow; import org.flyte.localengine.ExecutionContext; @@ -321,8 +323,13 @@ public SdkTestingExecutor withFixedInputs(SdkType type, T value) { public SdkTestingExecutor withTaskOutput( SdkRunnableTask task, InputT input, OutputT output) { + return withTaskOutput0(task, input, output); + } + + public SdkTestingExecutor withTaskOutput( + SdkRemoteTask task, InputT input, OutputT output) { TestingRunnableTask fixedTask = - getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); + getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs()); return toBuilder() .putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output)) @@ -330,11 +337,18 @@ public SdkTestingExecutor withTaskOutput( } public SdkTestingExecutor withTaskOutput( - SdkRemoteTask task, InputT input, OutputT output) { + SdkDynamicWorkflowTask task, InputT input, OutputT output) { + return withTaskOutput0(task, input, output); + } + + private SdkTestingExecutor withTaskOutput0( + SdkTransform task, InputT input, OutputT output) { TestingRunnableTask fixedTask = - getFixedTaskOrDefault(task.name(), task.inputs(), task.outputs()); + getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); - return toBuilder().putFixedTask(task.name(), fixedTask.withFixedOutput(input, output)).build(); + return toBuilder() + .putFixedTask(task.getName(), fixedTask.withFixedOutput(input, output)) + .build(); } public SdkTestingExecutor withLaunchPlanOutput( @@ -361,6 +375,16 @@ public SdkTestingExecutor withLaunchPlan( public SdkTestingExecutor withTask( SdkRunnableTask task, Function runFn) { + return withTask0(task, runFn); + } + + public SdkTestingExecutor withTask( + SdkDynamicWorkflowTask task, Function runFn) { + return withTask0(task, runFn); + } + + private SdkTestingExecutor withTask0( + SdkTransform task, Function runFn) { TestingRunnableTask fixedTask = getFixedTaskOrDefault(task.getName(), task.getInputType(), task.getOutputType()); diff --git a/pom.xml b/pom.xml index e2b083c19..aaaba5d72 100644 --- a/pom.xml +++ b/pom.xml @@ -50,12 +50,12 @@ flytekit-api flytekit-jackson flytekit-java - flytekit-scala_2.12 - flytekit-scala_2.13 - flytekit-scala-tests + + + flytekit-testing flytekit-examples - flytekit-examples-scala + flytekit-local-engine flyteidl-protos jflyte-api @@ -104,8 +104,8 @@ 5.6.2 - - -Xep:AutoValueImmutableFields:OFF -Xep:Var:ERROR + + 11 ${maven.compiler.release} From fa0176b241a34f8e56833ff3e06be973fd4430b2 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 14:48:58 +0200 Subject: [PATCH 8/9] Maybe a better diff Signed-off-by: Hongxin Liang --- .../org/flyte/flytekit/testing/SdkTestingExecutor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java index 1f1996fac..bcc1aba65 100644 --- a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java +++ b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java @@ -326,6 +326,11 @@ public SdkTestingExecutor withTaskOutput( return withTaskOutput0(task, input, output); } + public SdkTestingExecutor withTaskOutput( + SdkDynamicWorkflowTask task, InputT input, OutputT output) { + return withTaskOutput0(task, input, output); + } + public SdkTestingExecutor withTaskOutput( SdkRemoteTask task, InputT input, OutputT output) { TestingRunnableTask fixedTask = @@ -336,11 +341,6 @@ public SdkTestingExecutor withTaskOutput( .build(); } - public SdkTestingExecutor withTaskOutput( - SdkDynamicWorkflowTask task, InputT input, OutputT output) { - return withTaskOutput0(task, input, output); - } - private SdkTestingExecutor withTaskOutput0( SdkTransform task, InputT input, OutputT output) { TestingRunnableTask fixedTask = From d87ab1bfa43a762c8d15284eafb0492f1c9ce3e5 Mon Sep 17 00:00:00 2001 From: Hongxin Liang Date: Thu, 28 Sep 2023 14:55:39 +0200 Subject: [PATCH 9/9] Restore pom.xml Signed-off-by: Hongxin Liang --- pom.xml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index aaaba5d72..e2b083c19 100644 --- a/pom.xml +++ b/pom.xml @@ -50,12 +50,12 @@ flytekit-api flytekit-jackson flytekit-java - - - + flytekit-scala_2.12 + flytekit-scala_2.13 + flytekit-scala-tests flytekit-testing flytekit-examples - + flytekit-examples-scala flytekit-local-engine flyteidl-protos jflyte-api @@ -104,8 +104,8 @@ 5.6.2 - - + + -Xep:AutoValueImmutableFields:OFF -Xep:Var:ERROR 11 ${maven.compiler.release}