Skip to content

Action to join parallel execution #5088 #5104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/hop-user-manual/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ under the License.
*** xref:workflow/actions/getpop.adoc[Get mails (POP3/IMAP)]
*** xref:workflow/actions/http.adoc[HTTP]
*** xref:workflow/actions/eval.adoc[JavaScript]
*** xref:workflow/actions/join.adoc[Join]
*** xref:workflow/actions/mail.adoc[Mail]
*** xref:workflow/actions/mailvalidator.adoc[Mail validator]
*** xref:workflow/actions/movefiles.adoc[Move files]
Expand Down
29 changes: 29 additions & 0 deletions docs/hop-user-manual/modules/ROOT/pages/workflow/actions/join.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
////
:documentationPath: /workflow/actions/
:language: en_US
:description: The Join action waits for parallel branches.

= Join

== Description

The `Join` action waits for all parallel branches at the same level to complete before running the next action(s).

== Usage

By default, actions in a workflow run sequentially. To organize actions into separate branches and run those branches at the same time, you can execute parallel branches, and then join those branches later in your workflow.
1 change: 1 addition & 0 deletions engine/src/main/java/org/apache/hop/core/gui/IGc.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum EImage {
PARALLEL,
PARALLEL_DISABLED,
BUSY,
WAITING,
INJECT,
LOAD_BALANCE,
CHECKPOINT,
Expand Down
115 changes: 41 additions & 74 deletions engine/src/main/java/org/apache/hop/core/gui/SvgGc.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SvgGc implements IGc {
private static SvgFile imageUnconditional;
private static SvgFile imageUnconditionalDisabled;
private static SvgFile imageBusy;
private static SvgFile imageWaiting;
private static SvgFile imageMissing;
private static SvgFile imageDeprecated;
private static SvgFile imageInject;
Expand Down Expand Up @@ -234,6 +235,7 @@ private void init() {
imageUnconditionalDisabled =
new SvgFile("ui/images/unconditional-disabled.svg", this.getClass().getClassLoader());
imageBusy = new SvgFile("ui/images/busy.svg", this.getClass().getClassLoader());
imageWaiting = new SvgFile("ui/images/waiting.svg", this.getClass().getClassLoader());
imageInject = new SvgFile("ui/images/inject.svg", this.getClass().getClassLoader());
imageMissing = new SvgFile("ui/images/missing.svg", this.getClass().getClassLoader());
imageDeprecated = new SvgFile("ui/images/deprecated.svg", this.getClass().getClassLoader());
Expand Down Expand Up @@ -594,80 +596,45 @@ public Point getArea() {
return area;
}

public static final SvgFile getNativeImage(EImage image) {
switch (image) {
case LOCK:
return imageLocked;
case FAILURE:
return imageFailure;
case EDIT:
return imageEdit;
case CONTEXT_MENU:
return imageContextMenu;
case TRUE:
return imageTrue;
case TRUE_DISABLED:
return imageTrueDisabled;
case FALSE:
return imageFalse;
case FALSE_DISABLED:
return imageFalseDisabled;
case ERROR:
return imageError;
case ERROR_DISABLED:
return imageErrorDisabled;
case INFO:
return imageInfo;
case INFO_DISABLED:
return imageInfoDisabled;
case TARGET:
return imageTarget;
case TARGET_DISABLED:
return imageTargetDisabled;
case INPUT:
return imageInput;
case OUTPUT:
return imageOutput;
case ARROW:
return imageArrow;
case COPY_ROWS:
return imageCopyRows;
case COPY_ROWS_DISABLED:
return imageCopyRowsDisabled;
case LOAD_BALANCE:
return imageLoadBalance;
case CHECKPOINT:
return imageCheckpoint;
case DB:
return imageDatabase;
case PARALLEL:
return imageParallel;
case PARALLEL_DISABLED:
return imageParallelDisabled;
case UNCONDITIONAL:
return imageUnconditional;
case UNCONDITIONAL_DISABLED:
return imageUnconditionalDisabled;
case BUSY:
return imageBusy;
case INJECT:
return imageInject;
case ARROW_DEFAULT:
return imageArrowDefault;
case ARROW_TRUE:
return imageArrowTrue;
case ARROW_FALSE:
return imageArrowFalse;
case ARROW_ERROR:
return imageArrowError;
case ARROW_DISABLED:
return imageArrowDisabled;
case DATA:
return imageData;
default:
break;
}
return null;
public SvgFile getNativeImage(EImage image) {
return switch (image) {
case LOCK -> imageLocked;
case FAILURE -> imageFailure;
case EDIT -> imageEdit;
case CONTEXT_MENU -> imageContextMenu;
case TRUE -> imageTrue;
case TRUE_DISABLED -> imageTrueDisabled;
case FALSE -> imageFalse;
case FALSE_DISABLED -> imageFalseDisabled;
case ERROR -> imageError;
case ERROR_DISABLED -> imageErrorDisabled;
case INFO -> imageInfo;
case INFO_DISABLED -> imageInfoDisabled;
case TARGET -> imageTarget;
case TARGET_DISABLED -> imageTargetDisabled;
case INPUT -> imageInput;
case OUTPUT -> imageOutput;
case ARROW -> imageArrow;
case COPY_ROWS -> imageCopyRows;
case COPY_ROWS_DISABLED -> imageCopyRowsDisabled;
case LOAD_BALANCE -> imageLoadBalance;
case CHECKPOINT -> imageCheckpoint;
case DB -> imageDatabase;
case PARALLEL -> imageParallel;
case PARALLEL_DISABLED -> imageParallelDisabled;
case UNCONDITIONAL -> imageUnconditional;
case UNCONDITIONAL_DISABLED -> imageUnconditionalDisabled;
case BUSY -> imageBusy;
case WAITING -> imageWaiting;
case INJECT -> imageInject;
case ARROW_DEFAULT -> imageArrowDefault;
case ARROW_TRUE -> imageArrowTrue;
case ARROW_FALSE -> imageArrowFalse;
case ARROW_ERROR -> imageArrowError;
case ARROW_DISABLED -> imageArrowDisabled;
case DATA -> imageData;
default -> null;
};
}

@Override
Expand Down
24 changes: 16 additions & 8 deletions engine/src/main/java/org/apache/hop/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ private Result executeFromStart(
+ ")");
}

// Which entry is next?
// Which action is next?
IAction action = actionMeta.getAction();
action.getLogChannel().setLogLevel(logLevel);

Expand Down Expand Up @@ -821,11 +821,11 @@ private Result executeFromStart(
final ActionMeta nextAction = workflowMeta.findNextAction(actionMeta, i);

// See if we need to execute this...
final WorkflowHopMeta hi = workflowMeta.findWorkflowHop(actionMeta, nextAction);
final WorkflowHopMeta hopMeta = workflowMeta.findWorkflowHop(actionMeta, nextAction);

// The next comment...
final String nextComment;
if (hi.isUnconditional()) {
if (hopMeta.isUnconditional()) {
nextComment = BaseMessages.getString(PKG, "Workflow.Comment.FollowedUnconditional");
} else {
if (newResult.getResult()) {
Expand All @@ -840,9 +840,17 @@ private Result executeFromStart(
// If the start point was an evaluation and the link color is correct:
// green or red, execute the next action...
//
if (hi.isUnconditional()
|| (actionMeta.isEvaluation() && (hi.isEvaluation() == newResult.getResult()))) {
// Start this next transform!
if (hopMeta.isUnconditional()
|| (actionMeta.isEvaluation() && (hopMeta.isEvaluation() == newResult.getResult()))) {

// If the next action is a join, only execute once
if (nextAction.isJoin()) {
if (activeActions.contains(nextAction)) {
continue;
}
}

// Start this next action!
if (log.isBasic()) {
log.logBasic(
BaseMessages.getString(PKG, "Workflow.Log.StartingAction", nextAction.getName()));
Expand Down Expand Up @@ -1490,9 +1498,9 @@ public void setInteractive(boolean interactive) {
}

/**
* Gets the activeJobEntryPipelines.
* Gets the active actions.
*
* @return the activeJobEntryPipelines
* @return the active actions
*/
@Override
public Set<ActionMeta> getActiveActions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ protected void drawAction(ActionMeta actionMeta) throws HopException {
gc.setForeground(EColor.BLUE);
int iconX = (x + iconSize) - (miniIconSize / 2) + 1;
int iconY = y - (miniIconSize / 2) - 1;
gc.drawImage(EImage.BUSY, iconX, iconY, magnification);

gc.drawImage(actionMeta.isJoin() ? EImage.WAITING : EImage.BUSY, iconX, iconY, magnification);
areaOwners.add(
new AreaOwner(
AreaType.ACTION_BUSY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public boolean isStart() {
return false;
}

@Override
public boolean isJoin() {
return false;
}

/**
* Checks if the action executes a workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

/**
* This class describes the fact that a single Action can be used multiple times in the same
* Workflow. Therefore it contains a link to a Action, a position, a number, etc.
* Workflow. Therefore, it contains a link to an Action, a position, a number, etc.
*/
public class ActionMeta implements Cloneable, IGuiPosition, IChanged, IAttributes, IBaseMeta {
public static final String XML_TAG = "action";
Expand Down Expand Up @@ -302,6 +302,10 @@ public boolean isStart() {
return action.isStart();
}

public boolean isJoin() {
return action.isJoin();
}

public boolean isMissing() {
return action instanceof MissingAction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ void loadXml(Node actionNode, IHopMetadataProvider metadataProvider, IVariables
*/
boolean isStart();

/**
* Checks if the action is a join point of parallel execution
*
* @return true if joining point, false otherwise
*/
boolean isJoin();

/**
* This method is called when a action is duplicated in HopGui. It needs to return a deep copy of
* this action object. It is essential that the implementing class creates proper deep copies if
Expand Down
32 changes: 32 additions & 0 deletions plugins/actions/join/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.hop</groupId>
<artifactId>hop-plugins-actions</artifactId>
<version>2.13.0-SNAPSHOT</version>
</parent>

<artifactId>hop-action-join</artifactId>
<packaging>jar</packaging>
<name>Hop Plugins Actions Join</name>

</project>
50 changes: 50 additions & 0 deletions plugins/actions/join/src/assembly/assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.2.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.2.0 http://maven.apache.org/xsd/assembly-2.2.0.xsd">
<id>hop-action-join</id>
<formats>
<format>zip</format>
</formats>
<baseDirectory>.</baseDirectory>
<files>
<file>
<source>${project.basedir}/src/main/resources/version.xml</source>
<outputDirectory>plugins/actions/join</outputDirectory>
<filtered>true</filtered>
</file>
</files>

<fileSets>
<fileSet>
<directory>${project.basedir}/src/main/samples</directory>
<outputDirectory>config/projects/samples/</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<includes>
<include>org.apache.hop:hop-action-join:jar</include>
</includes>
<outputDirectory>plugins/actions/join</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
Loading