Skip to content

Commit

Permalink
fix(core): add worker group tag to the metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jul 11, 2023
1 parent e7e200e commit 38beef4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
22 changes: 20 additions & 2 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MetricRegistry {
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";

@Inject
private MeterRegistry meterRegistry;
Expand Down Expand Up @@ -128,17 +129,19 @@ private String metricName(String name) {
* We don't include current state since it will breakup the values per state and it's make no sense.
*
* @param workerTask the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(WorkerTask workerTask, String... tags) {
return ArrayUtils.addAll(
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
var finalTags = ArrayUtils.addAll(
ArrayUtils.addAll(
this.tags(workerTask.getTask()),
tags
),
TAG_NAMESPACE_ID, workerTask.getTaskRun().getNamespace(),
TAG_FLOW_ID, workerTask.getTaskRun().getFlowId()
);
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
Expand Down Expand Up @@ -182,6 +185,21 @@ public String[] tags(Execution execution) {
};
}

/**
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(TriggerContext triggerContext, String workerGroup) {
var finalTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
* Return tags for current {@link TriggerContext}
*
Expand Down
28 changes: 16 additions & 12 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ private void handleTask(WorkerTask workerTask) {

private void handleTrigger(WorkerTrigger workerTrigger) {
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext()))
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.record(() -> {
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext())));
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

try {
Expand Down Expand Up @@ -248,12 +248,12 @@ private WorkerTask cleanUpTransient(WorkerTask workerTask) {

private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws QueueException {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

if (workerTask.getTaskRun().getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(Duration.between(
workerTask.getTaskRun().getState().getStartDate(), now()
));
Expand Down Expand Up @@ -315,7 +315,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
MetricRegistry.METRIC_WORKER_RETRYED_COUNT,
metricRegistry.tags(
current.get(),
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment();
Expand Down Expand Up @@ -380,11 +381,11 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu

private void logTerminated(WorkerTask workerTask) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

metricRegistry
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(workerTask.getTaskRun().getState().getDuration());

workerTask.logger().info(
Expand Down Expand Up @@ -445,7 +446,7 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

metricRunningCount.incrementAndGet();

WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry);
WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry, workerGroup);

// emit attempts
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask
Expand Down Expand Up @@ -516,7 +517,7 @@ private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt ta
}

public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
String[] tags = this.metricRegistry.tags(workerTask);
String[] tags = this.metricRegistry.tags(workerTask, workerGroup);
Arrays.sort(tags);

long index = Hashing
Expand All @@ -528,7 +529,7 @@ public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
.computeIfAbsent(index, l -> metricRegistry.gauge(
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
new AtomicInteger(0),
metricRegistry.tags(workerTask)
metricRegistry.tags(workerTask, workerGroup)
));
}

Expand Down Expand Up @@ -591,12 +592,13 @@ public static class WorkerThread extends Thread {
RunnableTask<?> task;
RunContext runContext;
MetricRegistry metricRegistry;
String workerGroup;

Output taskOutput;
io.kestra.core.models.flows.State.Type taskState;
boolean killed = false;

public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry, String workerGroup) {
super("WorkerThread");
this.setUncaughtExceptionHandler(this::exceptionHandler);

Expand All @@ -605,6 +607,7 @@ public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task,
this.task = task;
this.runContext = runContext;
this.metricRegistry = metricRegistry;
this.workerGroup = workerGroup;
}

@Override
Expand All @@ -623,7 +626,8 @@ public void run() {
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT,
metricRegistry.tags(
this.workerTask,
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand Down

0 comments on commit 38beef4

Please sign in to comment.