Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: code expected to run in transaction now runs in transaction #2117

Merged
merged 10 commits into from
May 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public Optional<com.vmware.taurus.service.model.DataJobExecution> updateJobExecu
.lastDeployedDate(jobExecution.getDeployedDate())
.lastDeployedBy(jobExecution.getDeployedBy())
.build();
return Optional.of(jobExecutionRepository.save(dataJobExecution));
return Optional.of(jobExecutionRepository.saveAndFlush(dataJobExecution));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,15 @@
import com.vmware.taurus.service.diag.methodintercept.Measurable;
import com.vmware.taurus.service.execution.JobExecutionResultManager;
import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.ExecutionResult;
import com.vmware.taurus.service.model.JobLabel;
import com.vmware.taurus.service.threads.ThreadPoolConf;
import io.kubernetes.client.openapi.ApiException;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.transaction.Transactional;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,13 +30,7 @@
@Component
public class DataJobMonitor {

private static final long ONE_MINUTE_MILLIS = TimeUnit.MINUTES.toMillis(1);

private final Map<String, String> labelsToWatch =
Collections.singletonMap(JobLabel.TYPE.getValue(), "DataJob");

private final JobsRepository jobsRepository;
private final DataJobsKubernetesService dataJobsKubernetesService;
private final JobsService jobsService;
private final JobExecutionService jobExecutionService;
private final DataJobMetrics dataJobMetrics;
Expand All @@ -56,70 +41,15 @@ public class DataJobMonitor {
@Autowired
public DataJobMonitor(
JobsRepository jobsRepository,
DataJobsKubernetesService dataJobsKubernetesService,
JobsService jobsService,
JobExecutionService jobExecutionService,
DataJobMetrics dataJobMetrics) {
this.dataJobsKubernetesService = dataJobsKubernetesService;
this.jobsRepository = jobsRepository;
this.jobsService = jobsService;
this.jobExecutionService = jobExecutionService;
this.dataJobMetrics = dataJobMetrics;
}

/**
* This method is annotated with {@link SchedulerLock} to prevent it from being executed
* simultaneously by more than one instance of the service in a multi-node deployment. This aims
* to reduce the number of rps to the Kubernetes API as well as to avoid errors due to concurrent
* database writes.
*
* <p>The flow is as follows:
*
* <ol>
* <li>At any given point only one of the nodes will acquire the lock and execute the method.
* <li>A lock will be held for no longer than 10 minutes (as configured in {@link
* ThreadPoolConf}), which should be enough for a watch to complete (it currently has 5
* minutes timeout).
* <li>The other nodes will skip their schedules until after this node completes.
* <li>When a termination status of a job is updated by the node holding the lock, the other
* nodes will be eventually consistent within 5 seconds (by default) due to the continuous
* updates done here: {@link DataJobMonitorSync#updateDataJobStatus}.
* <li>Subsequently, when one of the other nodes acquires the lock, it will detect all changes
* since its own last run (see {@code lastWatchTime}) and rewrite them. We can potentially
* improve on this by sharing the lastWatchTime amongst the nodes.
* </ol>
*
* @see <a href="https://github.com/lukas-krecan/ShedLock">ShedLock</a>
*/
@Scheduled(
fixedDelayString = "${datajobs.status.watch.interval:1000}",
initialDelayString = "${datajobs.status.watch.initial.delay:10000}")
@SchedulerLock(name = "watchJobs_schedulerLock")
public void watchJobs() {
dataJobMetrics.incrementWatchTaskInvocations();
try {
dataJobsKubernetesService.watchJobs(
labelsToWatch,
s -> {
log.info(
"Termination message of Data Job {} with execution {}: {}",
s.getJobName(),
s.getExecutionId(),
s.getMainContainerTerminationMessage());
recordJobExecutionStatus(s);
},
runningJobExecutionIds -> {
jobExecutionService.syncJobExecutionStatuses(runningJobExecutionIds);
},
lastWatchTime);
// Move the lastWatchTime one minute into the past to account for events that
// could have happened after the watch has completed until now
lastWatchTime = Instant.now().minusMillis(ONE_MINUTE_MILLIS).toEpochMilli();
} catch (IOException | ApiException e) {
log.info("Failed to watch jobs. Error was: {}", e.getMessage());
}
}

/**
* Creates gauges that expose configuration information and termination status for the specified
* data jobs. If the gauges already exist for a particular data job, they are updated if
Expand Down Expand Up @@ -185,7 +115,7 @@ void updateDataJobInfoGauges(final DataJob dataJob) {
*/
@Measurable(includeArg = 0, argName = "execution_status")
@Transactional
void recordJobExecutionStatus(KubernetesService.JobExecution jobStatus) {
public void recordJobExecutionStatus(KubernetesService.JobExecution jobStatus) {
log.debug("Storing Data Job execution status: {}", jobStatus);
String dataJobName = jobStatus.getJobName();
ExecutionResult executionResult = JobExecutionResultManager.getResult(jobStatus);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.monitoring;

import com.vmware.taurus.service.execution.JobExecutionService;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.JobLabel;
import com.vmware.taurus.service.threads.ThreadPoolConf;
import io.kubernetes.client.openapi.ApiException;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class DataJobMonitorCron {

private static final long ONE_MINUTE_MILLIS = TimeUnit.MINUTES.toMillis(1);

private final Map<String, String> labelsToWatch =
Collections.singletonMap(JobLabel.TYPE.getValue(), "DataJob");

private final DataJobsKubernetesService dataJobsKubernetesService;
private final JobExecutionService jobExecutionService;
private final DataJobMetrics dataJobMetrics;
private final DataJobMonitor dataJobMonitor;

private long lastWatchTime =
Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(30)).toEpochMilli();

@Autowired
public DataJobMonitorCron(
DataJobsKubernetesService dataJobsKubernetesService,
JobExecutionService jobExecutionService,
DataJobMetrics dataJobMetrics,
DataJobMonitor dataJobMonitor) {
this.dataJobsKubernetesService = dataJobsKubernetesService;
this.jobExecutionService = jobExecutionService;
this.dataJobMetrics = dataJobMetrics;
this.dataJobMonitor = dataJobMonitor;
}

/**
* This method is annotated with {@link SchedulerLock} to prevent it from being executed
* simultaneously by more than one instance of the service in a multi-node deployment. This aims
* to reduce the number of rps to the Kubernetes API as well as to avoid errors due to concurrent
* database writes.
*
* <p>The flow is as follows:
*
* <ol>
* <li>At any given point only one of the nodes will acquire the lock and execute the method.
* <li>A lock will be held for no longer than 10 minutes (as configured in {@link
* ThreadPoolConf}), which should be enough for a watch to complete (it currently has 5
* minutes timeout).
* <li>The other nodes will skip their schedules until after this node completes.
* <li>When a termination status of a job is updated by the node holding the lock, the other
* nodes will be eventually consistent within 5 seconds (by default) due to the continuous
* updates done here: {@link DataJobMonitorSync#updateDataJobStatus}.
* <li>Subsequently, when one of the other nodes acquires the lock, it will detect all changes
* since its own last run (see {@code lastWatchTime}) and rewrite them. We can potentially
* improve on this by sharing the lastWatchTime amongst the nodes.
* </ol>
*
* @see <a href="https://github.com/lukas-krecan/ShedLock">ShedLock</a>
*/
@Scheduled(
fixedDelayString = "${datajobs.status.watch.interval:1000}",
initialDelayString = "${datajobs.status.watch.initial.delay:10000}")
@SchedulerLock(name = "watchJobs_schedulerLock")
public void watchJobs() {
dataJobMetrics.incrementWatchTaskInvocations();
try {
dataJobsKubernetesService.watchJobs(
labelsToWatch,
s -> {
log.info(
"Termination message of Data Job {} with execution {}: {}",
s.getJobName(),
s.getExecutionId(),
s.getMainContainerTerminationMessage());
dataJobMonitor.recordJobExecutionStatus(s);
},
jobExecutionService::syncJobExecutionStatuses,
lastWatchTime);
// Move the lastWatchTime one minute into the past to account for events that
// could have happened after the watch has completed until now
lastWatchTime = Instant.now().minusMillis(ONE_MINUTE_MILLIS).toEpochMilli();
} catch (IOException | ApiException e) {
log.info("Failed to watch jobs. Error was: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class DataJobMonitorTest {

@MockBean private DataJobsKubernetesService dataJobsKubernetesService;

@Autowired private DataJobMonitorCron dataJobMonitorCron;
@Autowired private DataJobMonitor dataJobMonitor;

@Test
Expand Down Expand Up @@ -199,7 +200,7 @@ public void testWatchJobs() throws IOException, ApiException {
.watchJobs(anyMap(), any(), any(), anyLong());
jobStatuses.forEach(s -> jobsRepository.save(new DataJob(s.getJobName(), new JobConfig())));

dataJobMonitor.watchJobs();
dataJobMonitorCron.watchJobs();

var gauges =
meterRegistry.find(DataJobMetrics.TAURUS_DATAJOB_TERMINATION_STATUS_METRIC_NAME).gauges();
Expand Down Expand Up @@ -234,7 +235,7 @@ public void testWatchJobsWithEmptyJobName() throws IOException, ApiException {
.when(dataJobsKubernetesService)
.watchJobs(anyMap(), any(), any(), anyLong());

dataJobMonitor.watchJobs();
dataJobMonitorCron.watchJobs();

var gauges =
meterRegistry.find(DataJobMetrics.TAURUS_DATAJOB_TERMINATION_STATUS_METRIC_NAME).gauges();
Expand All @@ -254,7 +255,7 @@ public void testWatchJobsWithMissingJob() throws IOException, ApiException {
.when(dataJobsKubernetesService)
.watchJobs(anyMap(), any(), any(), anyLong());

dataJobMonitor.watchJobs();
dataJobMonitorCron.watchJobs();

var gauges =
meterRegistry.find(DataJobMetrics.TAURUS_DATAJOB_TERMINATION_STATUS_METRIC_NAME).gauges();
Expand Down Expand Up @@ -286,7 +287,7 @@ public void testWatchJobsWithTheSameStatus() throws IOException, ApiException {
getTerminationStatus(s),
s.getExecutionId())));

dataJobMonitor.watchJobs();
dataJobMonitorCron.watchJobs();

var gauges =
meterRegistry.find(DataJobMetrics.TAURUS_DATAJOB_TERMINATION_STATUS_METRIC_NAME).gauges();
Expand Down Expand Up @@ -322,7 +323,7 @@ void testWatchJobsWithoutTerminationMessage() throws IOException, ApiException {
getTerminationStatus(s),
null)));

dataJobMonitor.watchJobs();
dataJobMonitorCron.watchJobs();

var gauges =
meterRegistry.find(DataJobMetrics.TAURUS_DATAJOB_TERMINATION_STATUS_METRIC_NAME).gauges();
Expand Down Expand Up @@ -355,7 +356,7 @@ public void testWatchJobsWhenExceptionIsThrown() throws IOException, ApiExceptio
.when(dataJobsKubernetesService)
.watchJobs(anyMap(), any(), any(), anyLong());

Assertions.assertDoesNotThrow(() -> dataJobMonitor.watchJobs());
Assertions.assertDoesNotThrow(() -> dataJobMonitorCron.watchJobs());
}

@Test
Expand Down