Skip to content

Commit

Permalink
fix(core,webserver): store labels as a list instead of a map
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jul 13, 2023
1 parent 16ac515 commit 007142c
Show file tree
Hide file tree
Showing 20 changed files with 190 additions and 50 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.kestra.core.models;

public record Label(String key, String value) {}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import ch.qos.logback.classic.spi.ThrowableProxy;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.kestra.core.models.Label;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import lombok.Builder;
import lombok.Value;
import lombok.With;
Expand Down Expand Up @@ -53,7 +58,9 @@ public class Execution implements DeletedInterface {
Map<String, Object> inputs;

@With
Map<String, String> labels;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
List<Label> labels;

@With
Map<String, Object> variables;
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.FlowService;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
Expand Down Expand Up @@ -63,7 +68,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {

String description;

Map<String, String> labels;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
List<Label> labels;

@Valid
List<Input<?>> inputs;
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.MissingRequiredInput;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
Expand Down Expand Up @@ -283,7 +284,7 @@ public Execution runOne(String namespace, String flowId, Integer revision, BiFun
return this.runOne(namespace, flowId, revision, inputs, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException {
return this.runOne(
flowRepository
.findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
Expand All @@ -301,7 +302,7 @@ public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Objec
return this.runOne(flow, inputs, duration, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}
Expand Down Expand Up @@ -381,7 +382,7 @@ private Predicate<Execution> isTerminatedChildExecution(Execution parentExecutio
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e);
}

public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Map<String, String> labels) {
public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, List<Label> labels) {
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
Expand All @@ -394,12 +395,12 @@ public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String,
execution = execution.withInputs(inputs.apply(flow, execution));
}

Map<String, String> executionLabels = new HashMap<>();
List<Label> executionLabels = new ArrayList<>();
if (flow.getLabels() != null) {
executionLabels.putAll(flow.getLabels());
executionLabels.addAll(flow.getLabels());
}
if (labels != null) {
executionLabels.putAll(labels);
executionLabels.addAll(labels);
}
if (!executionLabels.isEmpty()) {
execution = execution.withLabels(executionLabels);
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.TimeoutExceededException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
Expand Down Expand Up @@ -200,8 +201,8 @@ private void handleTrigger(WorkerTrigger workerTrigger) {
var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
evaluate = evaluate.map( execution -> {
Map<String, String> executionLabels = execution.getLabels() != null ? execution.getLabels() : new HashMap<>();
executionLabels.putAll(flowLabels);
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(flowLabels);
return execution.withLabels(executionLabels);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.kestra.core.serializers;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
import io.kestra.core.models.Label;
import io.kestra.core.utils.LabelUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* This deserializer is for historical purpose, labels was first a map but has been updated to a List of Label so
* this deserializer allows using both types.
*/
public class ListOrMapOfLabelDeserializer extends JsonDeserializer<List<Label>> implements ResolvableDeserializer {
@Override
public List<Label> deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.hasToken(JsonToken.VALUE_NULL)) {
return null;
}
else if (p.hasToken(JsonToken.START_ARRAY)) {
// deserialize as list
List<Map<String, String>> ret = ctxt.readValue(p, List.class);
return ret.stream().map(map -> new Label(map.get("key"), map.get("value"))).toList();
}
else if (p.hasToken(JsonToken.START_OBJECT)) {
// deserialize as map
Map<String, String> ret = ctxt.readValue(p, Map.class);
return LabelUtils.from(ret);
}
throw new IllegalArgumentException("Unable to deserialize value as it's neither an object neither an array");
}

@Override
public void resolve(DeserializationContext ctxt) throws JsonMappingException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.kestra.core.serializers;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* This serializer is for historical purpose, labels was first a map but has been updated to a List of Label so
* this serializer allows using both types.
*/
public class ListOrMapOfLabelSerializer extends JsonSerializer<Object> {
@Override
public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
if (value == null) {
gen.writeNull();
}
else if (value instanceof List) {
serializers.findValueSerializer(List.class).serialize(value, gen, serializers);
}
else if (value instanceof Map) {
serializers.findValueSerializer(Map.class).serialize(value, gen, serializers);
}
else {
throw new IllegalArgumentException("Unable to serialize value as it's neither a map nor a list");
}
}
}
7 changes: 5 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.tasks.flows;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -17,7 +18,9 @@

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -118,10 +121,10 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}
}

Map<String, String> labels = new HashMap<>();
List<Label> labels = new ArrayList<>();
if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
labels.put(entry.getKey(), runContext.render(entry.getValue()));
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.kestra.core.models.executions;

import io.kestra.core.models.Label;
import io.kestra.core.utils.IdUtils;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.flows.State;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -153,10 +155,10 @@ void originalId() {
@Test
void labels() {
final Execution execution = Execution.builder()
.labels(Map.of("test", "test-value"))
.labels(List.of(new Label("test", "test-value")))
.build();

assertThat(execution.getLabels().size(), is(1));
assertThat(execution.getLabels().get("test"), is("test-value"));
assertThat(execution.getLabels().get(0), is(new Label("test", "test-value")));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.models.triggers.types;

import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContextFactory;
Expand All @@ -10,6 +11,7 @@
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -28,9 +30,9 @@ void success() {
.namespace("io.kestra.unittest")
.revision(1)
.labels(
Map.of(
"flow-label-1", "flow-label-1",
"flow-label-2", "flow-label-2")
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2"))
)
.tasks(Collections.singletonList(Return.builder()
.id("test")
Expand Down Expand Up @@ -58,7 +60,7 @@ void success() {

assertThat(evaluate.isPresent(), is(true));
assertThat(evaluate.get().getFlowId(), is("flow-with-flow-trigger"));
assertThat(evaluate.get().getLabels().get("flow-label-1"), is("flow-label-1"));
assertThat(evaluate.get().getLabels().get("flow-label-2"), is("flow-label-2"));
assertThat(evaluate.get().getLabels().get(0), is(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels().get(1), is(new Label("flow-label-2", "flow-label-2")));;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.models.triggers.types;

import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.types.DateTimeBetweenCondition;
import io.kestra.core.models.conditions.types.DayWeekInMonthCondition;
Expand Down Expand Up @@ -91,8 +92,8 @@ void success() throws Exception {
assertThat(dateFromVars(vars.get("date"), date), is(date));
assertThat(dateFromVars(vars.get("next"), date), is(date.plusMonths(1)));
assertThat(dateFromVars(vars.get("previous"), date), is(date.minusMonths(1)));
assertThat(evaluate.get().getLabels().get("flow-label-1"), is("flow-label-1"));
assertThat(evaluate.get().getLabels().get("flow-label-2"), is("flow-label-2"));
assertThat(evaluate.get().getLabels().get(0), is(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels().get(1), is(new Label("flow-label-2", "flow-label-2")));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -396,9 +397,9 @@ private ConditionContext conditionContext(AbstractTrigger trigger) {
.id(IdUtils.create())
.namespace("io.kestra.tests")
.labels(
Map.of(
"flow-label-1", "flow-label-1",
"flow-label-2", "flow-label-2")
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2"))
)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.schedulers;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
Expand Down Expand Up @@ -62,9 +63,9 @@ protected static Flow createFlow(List<AbstractTrigger> triggers, List<TaskDefaul
))
.revision(1)
.labels(
Map.of(
"flow-label-1", "flow-label-1",
"flow-label-2", "flow-label-2")
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2"))
)
.triggers(triggers)
.tasks(Collections.singletonList(Return.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
Expand Down Expand Up @@ -93,8 +94,8 @@ void thread() throws Exception {

assertThat(last.get().getTrigger().getVariables().get("defaultInjected"), is("done"));
assertThat(last.get().getTrigger().getVariables().get("counter"), is(3));
assertThat(last.get().getLabels().get("flow-label-1"), is("flow-label-1"));
assertThat(last.get().getLabels().get("flow-label-2"), is("flow-label-2"));
assertThat(last.get().getLabels().get(0), is(new Label("flow-label-1", "flow-label-1")));
assertThat(last.get().getLabels().get(1), is(new Label("flow-label-2", "flow-label-2")));
AbstractSchedulerTest.COUNTER = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ public static Condition findCondition(AbstractJdbcRepository<Execution> jdbcRepo

if (labels != null) {
labels.forEach((key, value) -> {
Field<String> field = DSL.field("JQ_STRING(\"value\", '.labels." + key + "')", String.class);
Field<String> keyField = DSL.field("JQ_STRING(\"value\", '.labels[].key')", String.class);
conditions.add(keyField.eq(key));

Field<String> valueField = DSL.field("JQ_STRING(\"value\", '.labels[].value')", String.class);
if (value == null) {
conditions.add(field.isNotNull());
conditions.add(valueField.isNotNull());
} else {
conditions.add(field.eq(value));
conditions.add(valueField.eq(value));
}
});
}
Expand Down
Loading

0 comments on commit 007142c

Please sign in to comment.