Skip to content

Commit

Permalink
fix(core): trigger ids must be unique for a flow (#2022)
Browse files Browse the repository at this point in the history
closes #2021
  • Loading branch information
brian-mulier-p authored Sep 5, 2023
1 parent b5721e3 commit 516d5fb
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.tasks.flows.Dag;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.tasks.flows.Dag;
import io.kestra.core.tasks.flows.Switch;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.micronaut.context.annotation.Factory;
Expand All @@ -17,9 +17,13 @@

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

@Factory
public class ValidationFactory {
Expand Down Expand Up @@ -183,27 +187,31 @@ ConstraintValidator<FlowValidation, Flow> flowValidation() {

List<String> violations = new ArrayList<>();

// task unique id
// tasks unique id
List<String> taskIds = value.allTasksWithChilds()
.stream()
.map(Task::getId)
.toList();
List<String> taskDuplicates = taskIds
.stream()
.distinct()
.filter(entry -> Collections.frequency(taskIds, entry) > 1)
.toList();
if (taskDuplicates.size() > 0) {
violations.add("Duplicate task id with name [" + String.join(", ", taskDuplicates) + "]");

List<String> duplicateIds = getDuplicates(taskIds);

if (!duplicateIds.isEmpty()) {
violations.add("Duplicate task id with name [" + String.join(", ", duplicateIds) + "]");
}

duplicateIds = getDuplicates(value.allTriggerIds());

if (!duplicateIds.isEmpty()) {
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
}

value.allTasksWithChilds()
.stream()
.forEach(
task -> {
if (task instanceof io.kestra.core.tasks.flows.Flow taskFlow
&& taskFlow.getFlowId().equals(value.getId())
&& taskFlow.getNamespace().equals(value.getNamespace())) {
&& taskFlow.getFlowId().equals(value.getId())
&& taskFlow.getNamespace().equals(value.getNamespace())) {
violations.add("Recursive call to flow [" + value.getId() + "]");
}
}
Expand Down Expand Up @@ -234,6 +242,13 @@ ConstraintValidator<FlowValidation, Flow> flowValidation() {
};
}

private static List<String> getDuplicates(List<String> taskIds) {
return taskIds.stream()
.distinct()
.filter(entry -> Collections.frequency(taskIds, entry) > 1)
.collect(Collectors.toList());
}

@Singleton
ConstraintValidator<Regex, String> patternValidator() {
return (value, annotationMetadata, context) -> {
Expand All @@ -243,7 +258,7 @@ ConstraintValidator<Regex, String> patternValidator() {

try {
Pattern.compile(value);
} catch(PatternSyntaxException e) {
} catch (PatternSyntaxException e) {
context.messageTemplate("invalid pattern [" + value + "]");
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/kestra/core/models/flows/FlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ void duplicate() {
assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(1));

assertThat(validate.get().getMessage(), containsString("Duplicate task id with name [date]"));
assertThat(validate.get().getMessage(), containsString("Duplicate task id with name [date, listen]"));
assertThat(validate.get().getMessage(), containsString("Duplicate trigger id with name [trigger]"));
}

@Test
Expand Down
12 changes: 11 additions & 1 deletion core/src/test/resources/flows/invalids/duplicate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ namespace: io.kestra.tests

listeners:
- tasks:
- id: date
- id: listen
type: io.kestra.core.tasks.debugs.Return
format: "{{taskrun.startDate}}"

tasks:
- id: date
type: io.kestra.core.tasks.debugs.Return
format: "{{taskrun.startDate}}"
- id: listen
type: io.kestra.core.tasks.debugs.Return
format: "{{taskrun.startDate}}"

- id: seq
type: io.kestra.core.tasks.flows.Sequential
Expand All @@ -27,3 +30,10 @@ errors:
- id: date
type: io.kestra.core.tasks.debugs.Return
format: "{{taskrun.startDate}}"
triggers:
- id: trigger
type: io.kestra.core.models.triggers.types.Schedule
cron: '* * * * *'
- id: trigger
type: io.kestra.core.models.triggers.types.Webhook
key: t

0 comments on commit 516d5fb

Please sign in to comment.