Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Library, JobMonitor(and its test) and refactoring enode code #1850

Merged
merged 9 commits into from
Feb 11, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
"authentication": "%%mlJobAuth%%",
"default-error-format": "json",
"error-handler": "/MarkLogic/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/4/tracing/tracing-rewriter.xml",
"url-rewriter": "/data-hub/5/tracing/tracing-rewriter.xml",
"rewrite-resolves-globally": true
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"content-database": "%%mlStagingDbName%%",
"authentication": "%%mlStagingAuth%%",
"default-error-format": "json",
"error-handler": "/data-hub/4/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/4/rest-api/rewriter.xml",
"error-handler": "/data-hub/5/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/5/rest-api/rewriter.xml",
"rewrite-resolves-globally": true
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"server-name": "%%mlJobAppserverName%%",
"server-type": "http",
"root": "/",
"group-name": "%%GROUP%%",
"port": "%%mlJobPort%%",
"modules-database": "%%mlModulesDbName%%",
"content-database": "%%mlJobDbName%%",
"authentication": "%%mlJobAuth%%",
"default-error-format": "json",
"error-handler": "/MarkLogic/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/4/tracing/tracing-rewriter.xml",
"rewrite-resolves-globally": true
}
{
"server-name": "%%mlJobAppserverName%%",
"server-type": "http",
"root": "/",
"group-name": "%%GROUP%%",
"port": "%%mlJobPort%%",
"modules-database": "%%mlModulesDbName%%",
"content-database": "%%mlJobDbName%%",
"authentication": "%%mlJobAuth%%",
"default-error-format": "json",
"error-handler": "/MarkLogic/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/5/tracing/tracing-rewriter.xml",
"rewrite-resolves-globally": true
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"server-name": "%%mlStagingAppserverName%%",
"server-type": "http",
"root": "/",
"group-name": "%%GROUP%%",
"port": "%%mlStagingPort%%",
"modules-database": "%%mlModulesDbName%%",
"content-database": "%%mlStagingDbName%%",
"authentication": "%%mlStagingAuth%%",
"default-error-format": "json",
"error-handler": "/data-hub/4/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/4/rest-api/rewriter.xml",
"rewrite-resolves-globally": true
}
{
"server-name": "%%mlStagingAppserverName%%",
"server-type": "http",
"root": "/",
"group-name": "%%GROUP%%",
"port": "%%mlStagingPort%%",
"modules-database": "%%mlModulesDbName%%",
"content-database": "%%mlStagingDbName%%",
"authentication": "%%mlStagingAuth%%",
"default-error-format": "json",
"error-handler": "/data-hub/5/rest-api/error-handler.xqy",
"url-rewriter": "/data-hub/5/rest-api/rewriter.xml",
"rewrite-resolves-globally": true
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
* This application configuration is an entry point to using the DHF from a set property
*/
@Configuration
@ComponentScan(basePackages = {"com.marklogic.hub.impl", "com.marklogic.hub.legacy.impl", "com.marklogic.hub.deploy.commands"})
@ComponentScan(basePackages = {"com.marklogic.hub.impl", "com.marklogic.hub.legacy.impl", "com.marklogic.hub.deploy.commands",
"com.marklogic.hub.job.impl"})
@EnableAutoConfiguration
public class ApplicationConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.marklogic.hub.error.DataHubConfigurationException;
import com.marklogic.hub.error.DataHubProjectException;
import com.marklogic.hub.error.InvalidDBOperationError;
import com.marklogic.hub.job.impl.JobMonitorImpl;
import com.marklogic.hub.legacy.impl.LegacyFlowManagerImpl;
import com.marklogic.hub.processes.Process;
import com.marklogic.mgmt.DefaultManageConfigFactory;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class HubConfigImpl implements HubConfig
DataHubImpl dataHub;
@Autowired
Versions versions;
@Autowired
JobMonitorImpl jobMonitor;


protected String host;
Expand Down Expand Up @@ -1906,6 +1909,7 @@ public void refreshProject(Properties properties, boolean loadGradleProperties)
flowManager.setupClient();
dataHub.wireClient();
versions.setupClient();
jobMonitor.setupClient();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ else if(path.toLowerCase().equals("job-server.json")
((ObjectNode) rootNode).put("error-handler", "/data-hub/5/rest-api/error-handler.xqy");
}
else {
logger.info("Setting \"url-rewriter\" to \"/data-hub/4/tracing/tracing-rewriter.xml\"");
((ObjectNode) rootNode).put("url-rewriter", "/data-hub/4/tracing/tracing-rewriter.xml");
logger.info("Setting \"url-rewriter\" to \"/data-hub/5/tracing/tracing-rewriter.xml\"");
((ObjectNode) rootNode).put("url-rewriter", "/data-hub/5/tracing/tracing-rewriter.xml");
}
String serverFile = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode);
logger.info("Writing "+ f.toFile().getAbsolutePath() +" to "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.marklogic.hub.job;

import java.util.List;
import java.util.Map;
/**
* Monitor job / batch status
*/
public interface JobMonitor {

/**
* Gets the current running jobs
* @return A map of the current running job ids and their status
*/
Map<String, String> getCurrentJobs();

/**
* Sets the flow to be used with the flow runner
* @param jobId the flow object to be used
* @return string denoting the status of the running job, ex: "running step 1"
*/

String getJobStatus(String jobId);

/**
* Sets the flow to be used with the flow runner
* @param batchId the id of the batch
* @return string denoting status of the batch
*
*/

String getBatchStatus(String jobId, String batchId);

/**
* Sets the flow to be used with the flow runner
* @param jobId the id of the batch
* @param step the step of the job
* @return Map containing batch id and status of the batch
*
*/

Map<String,String> getStepBatchStatus(String jobId, String step);


/**
* Returns the response (uris or error msgs) of the batch
* @param jobId the id of the job
* @param batchId the id of the batch
* @return the flow runner object
*/

List<String> getBatchResponse(String jobId, String batchId);

/**
* The next step to be executed
* @param jobId the id of the job
* @return string denoting the next step of the job
*/

String getNextStep(String jobId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.marklogic.hub.job;

public enum JobStatus {
STARTED("started"),
FINISHED("finished"),
FINISHED_WITH_ERRORS("finished_with_errors"),
RUNNING("running"),
FAILED("failed"),
CANCELED("canceled");

private String type;
JobStatus(String type) {
this.type = type;
}

public static JobStatus getJobStatus(String status) {
for (JobStatus jobStatus : JobStatus.values()) {
if (jobStatus.toString().equals(status)) {
return jobStatus;
}
}
return null;
}

public String toString() {
return this.type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2012-2019 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.marklogic.hub.job.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.extensions.ResourceManager;
import com.marklogic.client.extensions.ResourceServices;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.util.RequestParameters;
import com.marklogic.hub.HubConfig;
import com.marklogic.hub.job.JobMonitor;
import com.marklogic.hub.job.JobStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class JobMonitorImpl extends ResourceManager implements JobMonitor {

private DatabaseClient client;
@Autowired
private HubConfig hubConfig;
private Jobs jobs;
private Batches batches;

public JobMonitorImpl() {
super();
}

public void setupClient() {
this.client = hubConfig.newJobDbClient();
jobs = new Jobs(client);
batches = new Batches(client);
}

//obtain the currently running jobs on the cluster
public Map<String, String> getCurrentJobs() {
JsonNode runningJobs = jobs.getJobs(null, JobStatus.RUNNING);
Map<String, String> jobs = new HashMap<>();
if (runningJobs.isArray()) {
for (final JsonNode objNode : runningJobs) {
JsonNode jobdoc = objNode.get("job");
jobs.put(jobdoc.get("jobId").textValue(), jobdoc.get("flow").textValue());
}
}
return jobs;
}

//obtain the status of any jobID
public String getJobStatus(String jobId) {
JsonNode job = jobs.getJobs(jobId, null);
String status = null;
if(job.get("job") != null){
status = job.get("job").get("jobStatus").textValue();
}
return status;
}


//status of all batches in a step within a jobID
public Map<String,String> getStepBatchStatus(String jobId, String step) {
JsonNode batch = batches.getBatches(jobId, step, null);
Map<String,String> status = new HashMap<>();
if (batch.isArray()) {
for (final JsonNode objNode : batch) {
status.put(objNode.get("batch").get("batchId").textValue(), objNode.get("batch").get("batchStatus").textValue());
}
}
return status;
}

//status of a single batch
public String getBatchStatus(String jobId, String batchId) {
JsonNode batch = batches.getBatches(jobId, null, batchId);
String status = null;
if (batch.get("batch") != null) {
status = batch.get("batch").get("batchStatus").textValue();
}
return status;
}

//response of a single batch
public List<String> getBatchResponse(String jobId, String batchId) {
JsonNode batch = batches.getBatches(jobId, null, batchId);
ObjectMapper mapper = new ObjectMapper();
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {});
if(batch.get("batch") != null) {
try {
return reader.readValue(batch.get("batch").get("uris"));
} catch (IOException e) {
throw new RuntimeException(e);
}

}
return null;
}

//TODO Implement this after flow artifact can be created
public String getNextStep(String jobId) {
return null;
}

public class Jobs extends ResourceManager {
private static final String NAME = "ml:jobs";

private RequestParameters params;

public Jobs(DatabaseClient client) {
super();
client.init(NAME, this);
}

private JsonNode getJobs(String jobId, JobStatus status) {
params = new RequestParameters();
if(jobId != null) {
params.add("jobid", jobId);
}
if(status != null) {
params.add("status", status.toString());
}

ResourceServices.ServiceResultIterator resultItr = this.getServices().get(params);
if (resultItr == null || ! resultItr.hasNext()) {
throw new RuntimeException("Unable to get job document");
}
ResourceServices.ServiceResult res = resultItr.next();
return res.getContent(new JacksonHandle()).get();
}

}

public class Batches extends ResourceManager {
private static final String NAME = "ml:batches";

private RequestParameters params ;

public Batches(DatabaseClient client) {
super();
client.init(NAME, this);
}

private JsonNode getBatches(String jobId, String step, String batchId) {
params = new RequestParameters();
if(jobId == null) {
throw new RuntimeException("Cannot get batches without jobId");
}
params.add("jobid", jobId);
if(batchId != null) {
params.add("batchid", batchId);
}
if(step != null) {
params.add("step", step);
}

ResourceServices.ServiceResultIterator resultItr = this.getServices().get(params);
if (resultItr == null || ! resultItr.hasNext()) {
throw new RuntimeException("Unable to get batch document");
}
ResourceServices.ServiceResult res = resultItr.next();
return res.getContent(new JacksonHandle()).get();
}

}
}
Loading