Skip to content

Commit

Permalink
feat(core): add metric for worker trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Mar 23, 2024
1 parent beee3c8 commit 5fae3dc
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 50 deletions.
52 changes: 48 additions & 4 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package io.kestra.core.metrics;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.MeterBinder;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -25,8 +30,12 @@ public class MetricRegistry {
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public final static String METRIC_WORKER_EVALUATE_TRIGGER_DURATION = "worker.evaluate.trigger.duration";
public final static String METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT = "worker.evaluate.trigger.running.count";
public final static String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public final static String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public final static String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public final static String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public final static String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public final static String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
Expand Down Expand Up @@ -57,6 +66,7 @@ public class MetricRegistry {
public final static String JDBC_QUERY_DURATION = "jdbc.query.duration";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_TRIGGER_TYPE = "trigger_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
Expand Down Expand Up @@ -148,6 +158,28 @@ public String[] tags(WorkerTask workerTask, String workerGroup, String... tags)
return workerTask.getTaskRun().getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, workerTask.getTaskRun().getTenantId());
}

/**
* Return tags for current {@link WorkerTask}.
* We don't include current state since it will break up the values per state which make no sense.
*
* @param workerTrigger the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
var baseTags = ArrayUtils.addAll(
ArrayUtils.addAll(
this.tags(workerTrigger.getTrigger()),
tags
),
TAG_NAMESPACE_ID, workerTrigger.getTriggerContext().getNamespace(),
TAG_FLOW_ID, workerTrigger.getTriggerContext().getFlowId()
);
baseTags = workerGroup == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_WORKER_GROUP, workerGroup);
return workerTrigger.getTriggerContext().getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, workerTrigger.getTriggerContext().getTenantId());
}


/**
* Return tags for current {@link WorkerTaskResult}
*
Expand Down Expand Up @@ -192,6 +224,18 @@ public String[] tags(Task task) {
};
}

/**
* Return tags for current {@link AbstractTrigger}
*
* @param trigger the current Trigger
* @return tags to applied to metrics
*/
public String[] tags(AbstractTrigger trigger) {
return new String[]{
TAG_TRIGGER_TYPE, trigger.getType(),
};
}

/**
* Return tags for current {@link Execution}
*
Expand Down
123 changes: 77 additions & 46 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,68 +285,99 @@ private void handleTask(WorkerTask workerTask) {
}
}


private RunContext initRunContextForTrigger(WorkerTrigger workerTrigger) {
return workerTrigger.getConditionContext()
.getRunContext()
.forWorker(this.applicationContext, workerTrigger);
}

private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execution> evaluate) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.increment();

if (log.isDebugEnabled()) {
logService.logTrigger(
workerTrigger.getTriggerContext(),
log,
Level.DEBUG,
"[type: {}] {}",
workerTrigger.getTrigger().getType(),
evaluate.map(execution -> "New execution '" + execution.getId() + "'").orElse("Empty evaluation")
);
}

var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
evaluate = evaluate.map( execution -> {
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(flowLabels);
return execution.withLabels(executionLabels);
}
);
}

this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
.execution(evaluate)
.triggerContext(workerTrigger.getTriggerContext())
.trigger(workerTrigger.getTrigger())
.build()
);
}

private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.increment();

logError(workerTrigger, e);
this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
.success(false)
.triggerContext(workerTrigger.getTriggerContext())
.trigger(workerTrigger.getTrigger())
.build()
);
}

private void handleTrigger(WorkerTrigger workerTrigger) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.increment();

this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.record(() -> {
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

try {
PollingTriggerInterface pollingTrigger = (PollingTriggerInterface) workerTrigger.getTrigger();
RunContext runContext = workerTrigger.getConditionContext()
.getRunContext()
.forWorker(this.applicationContext, workerTrigger);
Optional<Execution> evaluate = pollingTrigger.evaluate(
workerTrigger.getConditionContext().withRunContext(runContext),
workerTrigger.getTriggerContext()
);

if (log.isDebugEnabled()) {
logService.logTrigger(
workerTrigger.getTriggerContext(),
log,
Level.DEBUG,
"[type: {}] {}",
workerTrigger.getTrigger().getType(),
evaluate.map(execution -> "New execution '" + execution.getId() + "'").orElse("Empty evaluation")
);
}
if (workerTrigger.getTrigger() instanceof PollingTriggerInterface pollingTrigger) {
RunContext runContext = this.initRunContextForTrigger(workerTrigger);

var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
evaluate = evaluate.map( execution -> {
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(flowLabels);
return execution.withLabels(executionLabels);
}
Optional<Execution> evaluate = pollingTrigger.evaluate(
workerTrigger.getConditionContext().withRunContext(runContext),
workerTrigger.getTriggerContext()
);
}

workerTrigger.getConditionContext().getRunContext().cleanup();

this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
.execution(evaluate)
.triggerContext(workerTrigger.getTriggerContext())
.trigger(workerTrigger.getTrigger())
.build()
);
this.publishTriggerExecution(workerTrigger, evaluate);
}
} catch (Exception e) {
logError(workerTrigger, e);
this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
.success(false)
.triggerContext(workerTrigger.getTriggerContext())
.trigger(workerTrigger.getTrigger())
.build()
);
this.handleTriggerError(workerTrigger, e);
} finally {
workerTrigger.getConditionContext().getRunContext().cleanup();
}

this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(-1);
}
);

metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
}

private static ZonedDateTime now() {
Expand Down

0 comments on commit 5fae3dc

Please sign in to comment.