Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): add worker group tag to the metrics #1727

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MetricRegistry {
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";

@Inject
private MeterRegistry meterRegistry;
Expand Down Expand Up @@ -128,17 +129,19 @@ private String metricName(String name) {
* We don't include current state since it will breakup the values per state and it's make no sense.
*
* @param workerTask the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(WorkerTask workerTask, String... tags) {
return ArrayUtils.addAll(
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
var finalTags = ArrayUtils.addAll(
ArrayUtils.addAll(
this.tags(workerTask.getTask()),
tags
),
TAG_NAMESPACE_ID, workerTask.getTaskRun().getNamespace(),
TAG_FLOW_ID, workerTask.getTaskRun().getFlowId()
);
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
Expand Down Expand Up @@ -182,6 +185,21 @@ public String[] tags(Execution execution) {
};
}

/**
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
*/
public String[] tags(TriggerContext triggerContext, String workerGroup) {
var finalTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
return workerGroup != null ? ArrayUtils.addAll(finalTags, TAG_WORKER_GROUP, workerGroup) : finalTags;
}

/**
* Return tags for current {@link TriggerContext}
*
Expand Down
28 changes: 16 additions & 12 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ private void handleTask(WorkerTask workerTask) {

private void handleTrigger(WorkerTrigger workerTrigger) {
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext()))
.timer(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.record(() -> {
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext())));
.gauge(MetricRegistry.METRIC_WORKER_EVALUATE_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);

try {
Expand Down Expand Up @@ -248,12 +248,12 @@ private WorkerTask cleanUpTransient(WorkerTask workerTask) {

private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws QueueException {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

if (workerTask.getTaskRun().getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(Duration.between(
workerTask.getTaskRun().getState().getStartDate(), now()
));
Expand Down Expand Up @@ -315,7 +315,8 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
MetricRegistry.METRIC_WORKER_RETRYED_COUNT,
metricRegistry.tags(
current.get(),
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment();
Expand Down Expand Up @@ -380,11 +381,11 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu

private void logTerminated(WorkerTask workerTask) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask))
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.increment();

metricRegistry
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask))
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.record(workerTask.getTaskRun().getState().getDuration());

workerTask.logger().info(
Expand Down Expand Up @@ -445,7 +446,7 @@ private WorkerTask runAttempt(WorkerTask workerTask) {

metricRunningCount.incrementAndGet();

WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry);
WorkerThread workerThread = new WorkerThread(logger, workerTask, task, runContext, metricRegistry, workerGroup);

// emit attempts
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask
Expand Down Expand Up @@ -516,7 +517,7 @@ private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt ta
}

public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
String[] tags = this.metricRegistry.tags(workerTask);
String[] tags = this.metricRegistry.tags(workerTask, workerGroup);
Arrays.sort(tags);

long index = Hashing
Expand All @@ -528,7 +529,7 @@ public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
.computeIfAbsent(index, l -> metricRegistry.gauge(
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
new AtomicInteger(0),
metricRegistry.tags(workerTask)
metricRegistry.tags(workerTask, workerGroup)
));
}

Expand Down Expand Up @@ -591,12 +592,13 @@ public static class WorkerThread extends Thread {
RunnableTask<?> task;
RunContext runContext;
MetricRegistry metricRegistry;
String workerGroup;

Output taskOutput;
io.kestra.core.models.flows.State.Type taskState;
boolean killed = false;

public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry, String workerGroup) {
super("WorkerThread");
this.setUncaughtExceptionHandler(this::exceptionHandler);

Expand All @@ -605,6 +607,7 @@ public WorkerThread(Logger logger, WorkerTask workerTask, RunnableTask<?> task,
this.task = task;
this.runContext = runContext;
this.metricRegistry = metricRegistry;
this.workerGroup = workerGroup;
}

@Override
Expand All @@ -623,7 +626,8 @@ public void run() {
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT,
metricRegistry.tags(
this.workerTask,
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount())
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount()),
MetricRegistry.TAG_WORKER_GROUP, workerGroup
)
)
.increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand Down
6 changes: 1 addition & 5 deletions core/src/test/java/io/kestra/core/runners/RetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ void retryFailed() throws TimeoutException {

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));

// be sure attempts are available on the queue
// we cannot know the exact number of executions, but we should have at least 15 of them
assertThat(executions.size(), greaterThan(15));
assertThat(executions.get(8).getTaskRunList().get(0).getAttempts().size(), is(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}