Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: refactor monitoring logic to improve encapsulation #391

Merged
merged 7 commits into from
Oct 18, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.vmware.taurus.service.credentials.JobCredentialsService;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.monitoring.DataJobInfoMonitor;
import com.vmware.taurus.service.monitoring.DataJobMetrics;
import com.vmware.taurus.service.webhook.WebHookRequestBody;
import com.vmware.taurus.service.webhook.WebHookRequestBodyProvider;
import com.vmware.taurus.service.webhook.WebHookResult;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class JobsService {
private final WebHookRequestBodyProvider webHookRequestBodyProvider;
private final PostCreateWebHookProvider postCreateWebHookProvider;
private final PostDeleteWebHookProvider postDeleteWebHookProvider;
private final DataJobInfoMonitor dataJobInfoMonitor;
private final DataJobMetrics dataJobMetrics;


public JobOperationResult deleteJob(String name) {
Expand All @@ -59,13 +59,11 @@ public JobOperationResult deleteJob(String name) {

WebHookRequestBody requestBody = webHookRequestBodyProvider.constructPostDeleteBody(jobsRepository.findById(name).get());
Optional<WebHookResult> resultHolder = postDeleteWebHookProvider.invokeWebHook(requestBody);
if(isInvocationSuccessful(resultHolder)) {
dataJobInfoMonitor.removeDataJobInfo(() -> {
credentialsService.deleteJobCredentials(name);
deploymentService.deleteDeployment(name);
jobsRepository.deleteById(name);
return name;
});
if (isInvocationSuccessful(resultHolder)) {
credentialsService.deleteJobCredentials(name);
deploymentService.deleteDeployment(name);
jobsRepository.deleteById(name);
dataJobMetrics.clearGauges(name);

return JobOperationResult.builder()
.completed(true)
Expand Down Expand Up @@ -101,12 +99,12 @@ public JobOperationResult createJob(DataJob jobInfo) {
Optional<WebHookResult> resultHolder = postCreateWebHookProvider.invokeWebHook(requestBody);
if(isInvocationSuccessful(resultHolder)) {
// Save the data job and update the job info metrics
dataJobInfoMonitor.updateDataJobInfo(() -> {
if (jobInfo.getJobConfig().isGenerateKeytab()) {
credentialsService.createJobCredentials(jobInfo.getName());
}
return jobsRepository.save(jobInfo);
});
if (jobInfo.getJobConfig().isGenerateKeytab()) {
credentialsService.createJobCredentials(jobInfo.getName());
}
var dataJob = jobsRepository.save(jobInfo);
dataJobMetrics.updateInfoGauge(dataJob);
dataJobMetrics.updateNotificationDelayGauge(dataJob);

return JobOperationResult.builder()
.completed(true)
Expand Down Expand Up @@ -140,8 +138,10 @@ private boolean isInvocationSuccessful(Optional<WebHookResult> resultHolder) {
* @return if the job existed
*/
public boolean updateJob(DataJob jobInfo) {
return dataJobInfoMonitor.updateDataJobInfo(() ->
jobsRepository.existsById(jobInfo.getName()) ? jobsRepository.save(jobInfo) : null);
var dataJob = jobsRepository.existsById(jobInfo.getName()) ? jobsRepository.save(jobInfo) : null;
dataJobMetrics.updateInfoGauge(dataJob);
dataJobMetrics.updateNotificationDelayGauge(dataJob);
return dataJob != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,204 +6,43 @@
package com.vmware.taurus.service.monitoring;

import com.vmware.taurus.service.model.DataJob;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import static com.vmware.taurus.service.Utilities.join;

@Slf4j
@Component
public class DataJobInfoMonitor {

public static final String TAURUS_DATAJOB_INFO_METRIC_NAME = "taurus.datajob.info";
public static final String TAURUS_DATAJOB_NOTIFICATION_DELAY_METRIC_NAME = "taurus.datajob.notification.delay";
public static final Integer GAUGE_METRIC_VALUE = 1;
public static final int DEFAULT_NOTIFICATION_DELAY_PERIOD_MINUTES = 240;

private final MeterRegistry meterRegistry;

private final Map<String, Gauge> infoGauges = new ConcurrentHashMap<>();
private final Map<String, Gauge> delayGauges = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock(true);
private final Map<String, Integer> currentDelays = new ConcurrentHashMap<>();
private final DataJobMetrics dataJobMetrics;

@Autowired
public DataJobInfoMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
public DataJobInfoMonitor(DataJobMetrics dataJobMetrics) {
this.dataJobMetrics = dataJobMetrics;
}

/**
* Creates a gauge to expose information about the specified data job.
* If a gauge already exists for the job, it is updated if necessary.
* <p>
* This method is synchronized.
*
* @param dataJobSupplier A supplier of the data job for which to create or update a gauge.
* @return true if the supplier produced a job; otherwise, false.
* @param dataJob The data job for which to create or update a gauge.
*/
public boolean updateDataJobInfo(final Supplier<DataJob> dataJobSupplier) {
return updateDataJobsInfo(() -> {
final var dataJob = Objects.requireNonNullElse(dataJobSupplier, () -> null).get();
return dataJob == null ? Collections.emptyList() : Collections.singletonList(dataJob);
});
}

/**
* Creates a gauge to expose information about each of the specified data jobs.
* If a gauge already exists for any of the jobs, it is updated if necessary.
* <p>
* This method is synchronized.
*
* @param dataJobsSupplier A supplier of the data jobs for which to create or update gauges.
* @return true if the supplier produced at least one job; otherwise, false.
*/
public boolean updateDataJobsInfo(final Supplier<Iterable<DataJob>> dataJobsSupplier) {
lock.lock();
try {
var dataJobs = Objects.requireNonNullElse(dataJobsSupplier, Collections::emptyList).get().iterator();
if (!dataJobs.hasNext()) {
return false;
}

dataJobs.forEachRemaining(dataJob -> {
if (dataJob == null) {
log.warn("The data job is null");
return;
}

var dataJobName = dataJob.getName();
if (dataJob.getJobConfig() == null) {
log.debug("The data job {} does not have configuration", dataJobName);
return;
}

var gauge = infoGauges.getOrDefault(dataJobName, null);
var newTags = createGaugeTags(dataJob);
if (isChanged(gauge, newTags)) {
log.info("The configuration of data job {} has changed", dataJobName);
removeInfoGauge(dataJobName);
}

if (!infoGauges.containsKey(dataJobName)) {
gauge = createInfoGauge(newTags);
infoGauges.put(dataJobName, gauge);
log.info("The info gauge for data job {} was created", dataJobName);
}

if (!delayGauges.containsKey(dataJobName)) {
gauge = createNotificationDelayGauge(dataJobName);
delayGauges.put(dataJobName, gauge);
log.info("The notification delay gauge for data job {} was created", dataJobName);
}
currentDelays.put(dataJobName,
Optional.ofNullable(dataJob.getJobConfig().getNotificationDelayPeriodMinutes()).orElse(DEFAULT_NOTIFICATION_DELAY_PERIOD_MINUTES));
});
public void updateDataJobInfo(final DataJob dataJob) {
Objects.requireNonNull(dataJob);

return true;
} finally {
lock.unlock();
}
dataJobMetrics.updateInfoGauge(dataJob);
dataJobMetrics.updateNotificationDelayGauge(dataJob);
}

/**
* Removes the gauges and cleans up the state associated with the specified data job.
* <p>
* This method is synchronized.
* Creates a gauge to expose information about each of the specified data jobs.
*
* @param dataJobNameSupplier A supplier of the name of the data job whose gauge is to be removed.
* @param dataJobs The data jobs for which to create or update gauges.
*/
public void removeDataJobInfo(final Supplier<String> dataJobNameSupplier) {
lock.lock();
try {
final var jobName = Objects.requireNonNullElse(dataJobNameSupplier, () -> null).get();
removeInfoGauge(jobName);
removeNotificationDelayGauge(jobName);
} finally {
lock.unlock();
}
}

private void removeInfoGauge(final String dataJobName) {
if (StringUtils.isNotBlank(dataJobName)) {
var gauge = infoGauges.getOrDefault(dataJobName, null);
if (gauge != null) {
meterRegistry.remove(gauge);
infoGauges.remove(dataJobName);
log.info("The info gauge for data job {} was removed", dataJobName);
} else {
log.info("The info gauge for data job {} cannot be removed: gauge not found", dataJobName);
}
} else {
log.warn("The info gauge cannot be removed: data job name is empty");
}
}

private void removeNotificationDelayGauge(final String dataJobName) {
if (StringUtils.isNotBlank(dataJobName)) {
var gauge = delayGauges.getOrDefault(dataJobName, null);
if (gauge != null) {
meterRegistry.remove(gauge);
delayGauges.remove(dataJobName);
currentDelays.remove(dataJobName);
log.info("The notification delay gauge for data job {} was removed", dataJobName);
} else {
log.info("The notification delay gauge for data job {} cannot be removed: gauge not found", dataJobName);
}
} else {
log.warn("The notification delay gauge cannot be removed: data job name is empty");
}
}

private boolean isChanged(final Gauge gauge, final Tags newTags) {
if (gauge == null) {
return false;
}
public void updateDataJobsInfo(final Iterable<DataJob> dataJobs) {
Objects.requireNonNull(dataJobs);

var existingTags = gauge.getId().getTags();
return !newTags.stream().allMatch(existingTags::contains);
}

private Gauge createInfoGauge(final Tags tags) {
return Gauge.builder(TAURUS_DATAJOB_INFO_METRIC_NAME, GAUGE_METRIC_VALUE, value -> value)
.tags(tags)
.description("Info about data jobs")
.register(meterRegistry);
}

private Gauge createNotificationDelayGauge(final String dataJobName) {
return Gauge.builder(TAURUS_DATAJOB_NOTIFICATION_DELAY_METRIC_NAME, currentDelays,
map -> map.getOrDefault(dataJobName, 0))
.tags(Tags.of("data_job", dataJobName))
.description("The time (in minutes) a job execution is allowed to be delayed from its schedule before an alert is triggered")
.register(meterRegistry);
}

private Tags createGaugeTags(final DataJob dataJob) {
Objects.requireNonNull(dataJob);
var jobConfig = dataJob.getJobConfig();
boolean enableExecutionNotifications = jobConfig.getEnableExecutionNotifications() == null ||
jobConfig.getEnableExecutionNotifications();
return Tags.of(
"data_job", dataJob.getName(),
"team", StringUtils.defaultString(jobConfig.getTeam()),
"email_notified_on_success", enableExecutionNotifications ?
join(jobConfig.getNotifiedOnJobSuccess()) : "",
"email_notified_on_user_error", enableExecutionNotifications ?
join(jobConfig.getNotifiedOnJobFailureUserError()) : "",
"email_notified_on_platform_error", enableExecutionNotifications ?
join(jobConfig.getNotifiedOnJobFailurePlatformError()) : "");
dataJobs.forEach(this::updateDataJobInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
public class DataJobInfoMonitorSync {

private final DataJobInfoMonitor dataJobInfoMonitor;

private final JobsRepository jobsRepository;

@Autowired
Expand All @@ -29,8 +28,6 @@ public DataJobInfoMonitorSync(DataJobInfoMonitor dataJobInfoMonitor, JobsReposit
fixedDelayString = "${datajobs.monitoring.sync.interval}",
initialDelayString = "${datajobs.monitoring.sync.initial.delay}")
public void updateDataJobInfo() {
if (!dataJobInfoMonitor.updateDataJobsInfo(jobsRepository::findAll)) {
log.debug("There are no data jobs");
}
dataJobInfoMonitor.updateDataJobsInfo(jobsRepository.findAll());
}
}
Loading