Skip to content

Commit

Permalink
chore(version): update to version 'v0.10.1'.
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Jul 18, 2023
2 parents bd93937 + d102e22 commit f673a19
Show file tree
Hide file tree
Showing 46 changed files with 406 additions and 157 deletions.
22 changes: 20 additions & 2 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MetricRegistry {
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";

@Inject
private MeterRegistry meterRegistry;
Expand Down Expand Up @@ -128,17 +129,19 @@ private String metricName(String name) {
* We don't include current state since it will breakup the values per state and it's make no sense.
*
* @param workerTask the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(WorkerTask workerTask, String... tags) {
return ArrayUtils.addAll(
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
var finalTags = ArrayUtils.addAll(
ArrayUtils.addAll(
this.tags(workerTask.getTask()),
tags
),
TAG_NAMESPACE_ID, workerTask.getTaskRun().getNamespace(),
TAG_FLOW_ID, workerTask.getTaskRun().getFlowId()
);
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
Expand Down Expand Up @@ -182,6 +185,21 @@ public String[] tags(Execution execution) {
};
}

/**
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(TriggerContext triggerContext, String workerGroup) {
var finalTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
* Return tags for current {@link TriggerContext}
*
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.kestra.core.models;

public record Label(String key, String value) {}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import ch.qos.logback.classic.spi.ThrowableProxy;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.kestra.core.models.Label;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import lombok.Builder;
import lombok.Value;
import lombok.With;
Expand Down Expand Up @@ -53,7 +58,9 @@ public class Execution implements DeletedInterface {
Map<String, Object> inputs;

@With
Map<String, String> labels;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
List<Label> labels;

@With
Map<String, Object> variables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private static String cleanupSource(String source) {
}

public boolean isUpdatable(Flow flow, String flowSource) {
return flow.equalsWithoutRevision(flow) &&
return this.equalsWithoutRevision(flow) &&
this.source.equals(cleanupSource(flowSource));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ default ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Opti
description = "The interval between 2 different test of schedule, this can avoid to overload the remote system " +
"with too many call. For most of trigger that depend on external system, a minimal interval must be " +
"at least PT30S.\n" +
"See [ISO_8601 Durations](https://en.wikipedia.org/wiki/ISO_8601#Durations) for more information of available interval value",
defaultValue = "PT1S"
"See [ISO_8601 Durations](https://en.wikipedia.org/wiki/ISO_8601#Durations) for more information of available interval value"
)
@PluginProperty
Duration getInterval();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.core.models.triggers.types;

import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.utils.LabelUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -85,7 +87,7 @@ public Optional<Execution> evaluate(RunContext runContext, io.kestra.core.models
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.labels(flow.getLabels())
.labels(LabelUtils.from(flow.getLabels()))
.state(new State())
.trigger(ExecutionTrigger.of(
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -20,6 +21,7 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.LabelUtils;
import io.kestra.core.validations.CronExpression;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand Down Expand Up @@ -273,7 +275,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(context.getFlowRevision())
.labels(conditionContext.getFlow().getLabels())
.labels(LabelUtils.from(conditionContext.getFlow().getLabels()))
.state(new State())
.trigger(executionTrigger)
// keep to avoid breaking compatibility
Expand Down
32 changes: 18 additions & 14 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,27 @@ private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun

if (parent instanceof FlowableTask<?> flowableParent) {

List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
try {
List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
);

if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);

if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);
}
} catch (Exception e) {
log.warn("Unable to resolve the next tasks to run", e);
}
}

Expand Down
14 changes: 8 additions & 6 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.MissingRequiredInput;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
Expand All @@ -15,6 +16,7 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.LabelUtils;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.reactivex.Flowable;
import io.reactivex.Single;
Expand Down Expand Up @@ -283,7 +285,7 @@ public Execution runOne(String namespace, String flowId, Integer revision, BiFun
return this.runOne(namespace, flowId, revision, inputs, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException {
return this.runOne(
flowRepository
.findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
Expand All @@ -301,7 +303,7 @@ public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Objec
return this.runOne(flow, inputs, duration, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}
Expand Down Expand Up @@ -381,7 +383,7 @@ private Predicate<Execution> isTerminatedChildExecution(Execution parentExecutio
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e);
}

public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Map<String, String> labels) {
public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, List<Label> labels) {
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
Expand All @@ -394,12 +396,12 @@ public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String,
execution = execution.withInputs(inputs.apply(flow, execution));
}

Map<String, String> executionLabels = new HashMap<>();
List<Label> executionLabels = new ArrayList<>();
if (flow.getLabels() != null) {
executionLabels.putAll(flow.getLabels());
executionLabels.addAll(LabelUtils.from(flow.getLabels()));
}
if (labels != null) {
executionLabels.putAll(labels);
executionLabels.addAll(labels);
}
if (!executionLabels.isEmpty()) {
execution = execution.withLabels(executionLabels);
Expand Down
34 changes: 20 additions & 14 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.TimeoutExceededException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
Expand All @@ -25,6 +26,7 @@
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.LabelUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.inject.qualifiers.Qualifiers;
Expand Down Expand Up @@ -170,10 +172,10 @@ private void handleTask(WorkerTask workerTask) {

private void handleTrigger(WorkerTrigger workerTrigger) {
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext()))
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.record(() -> {
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext())));
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

try {
Expand All @@ -200,8 +202,8 @@ private void handleTrigger(WorkerTrigger workerTrigger) {
var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
evaluate = evaluate.map( execution -> {
Map<String, String> executionLabels = execution.getLabels() != null ? execution.getLabels() : new HashMap<>();
executionLabels.putAll(flowLabels);
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(LabelUtils.from(flowLabels));
return execution.withLabels(executionLabels);
}
);
Expand Down Expand Up @@ -248,12 +250,12 @@ private WorkerTask cleanUpTransient(WorkerTask workerTask) {

private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws QueueException {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

if (workerTask.getTaskRun().getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(Duration.between(
workerTask.getTaskRun().getState().getStartDate(), now()
));
Expand Down Expand Up @@ -315,7 +317,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
MetricRegistry.METRIC_WORKER_RETRYED_COUNT,
metricRegistry.tags(
current.get(),
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment();
Expand Down Expand Up @@ -380,11 +383,11 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu

private void logTerminated(WorkerTask workerTask) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

metricRegistry
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(workerTask.getTaskRun().getState().getDuration());

workerTask.logger().info(
Expand Down Expand Up @@ -445,7 +448,7 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

metricRunningCount.incrementAndGet();

WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry);
WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry, workerGroup);

// emit attempts
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask
Expand Down Expand Up @@ -516,7 +519,7 @@ private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt ta
}

public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
String[] tags = this.metricRegistry.tags(workerTask);
String[] tags = this.metricRegistry.tags(workerTask, workerGroup);
Arrays.sort(tags);

long index = Hashing
Expand All @@ -528,7 +531,7 @@ public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
.computeIfAbsent(index, l -> metricRegistry.gauge(
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
new AtomicInteger(0),
metricRegistry.tags(workerTask)
metricRegistry.tags(workerTask, workerGroup)
));
}

Expand Down Expand Up @@ -591,12 +594,13 @@ public static class WorkerThread extends Thread {
RunnableTask<?> task;
RunContext runContext;
MetricRegistry metricRegistry;
String workerGroup;

Output taskOutput;
io.kestra.core.models.flows.State.Type taskState;
boolean killed = false;

public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry, String workerGroup) {
super("WorkerThread");
this.setUncaughtExceptionHandler(this::exceptionHandler);

Expand All @@ -605,6 +609,7 @@ public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task,
this.task = task;
this.runContext = runContext;
this.metricRegistry = metricRegistry;
this.workerGroup = workerGroup;
}

@Override
Expand All @@ -623,7 +628,8 @@ public void run() {
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT,
metricRegistry.tags(
this.workerTask,
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment()
Expand Down
Loading

0 comments on commit f673a19

Please sign in to comment.