-
Notifications
You must be signed in to change notification settings - Fork 124
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Job Library, JobMonitor(and its test) and refactoring enode code (#1850)
* Create JobMonitor class, tests * Create enode code for job and batch document CRUD * Running legacy flows in 4.x mode and new flows in 5.x * Remove unnecessary comments * Adding a new REST extension for 'batches' and renaming existing one to 'jobs', creating JobMonitor interface * Commenting the module count tests * Include the impl package in ApplicationConfig and remove setting 'jobMonitor' to null in resetProperties() in tests * Delete unnecessary transforms, extensions and modules from 4 and 5 dirs * Exporting class instead of instance
- Loading branch information
1 parent
000dbc8
commit 4c68766
Showing
200 changed files
with
1,164 additions
and
27,680 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 14 additions & 14 deletions
28
examples/disconnected-project/src/main/hub-internal-config/servers/job-server.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,14 @@ | ||
{ | ||
"server-name": "%%mlJobAppserverName%%", | ||
"server-type": "http", | ||
"root": "/", | ||
"group-name": "%%GROUP%%", | ||
"port": "%%mlJobPort%%", | ||
"modules-database": "%%mlModulesDbName%%", | ||
"content-database": "%%mlJobDbName%%", | ||
"authentication": "%%mlJobAuth%%", | ||
"default-error-format": "json", | ||
"error-handler": "/MarkLogic/rest-api/error-handler.xqy", | ||
"url-rewriter": "/data-hub/4/tracing/tracing-rewriter.xml", | ||
"rewrite-resolves-globally": true | ||
} | ||
{ | ||
"server-name": "%%mlJobAppserverName%%", | ||
"server-type": "http", | ||
"root": "/", | ||
"group-name": "%%GROUP%%", | ||
"port": "%%mlJobPort%%", | ||
"modules-database": "%%mlModulesDbName%%", | ||
"content-database": "%%mlJobDbName%%", | ||
"authentication": "%%mlJobAuth%%", | ||
"default-error-format": "json", | ||
"error-handler": "/MarkLogic/rest-api/error-handler.xqy", | ||
"url-rewriter": "/data-hub/5/tracing/tracing-rewriter.xml", | ||
"rewrite-resolves-globally": true | ||
} |
28 changes: 14 additions & 14 deletions
28
examples/disconnected-project/src/main/hub-internal-config/servers/staging-server.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,14 @@ | ||
{ | ||
"server-name": "%%mlStagingAppserverName%%", | ||
"server-type": "http", | ||
"root": "/", | ||
"group-name": "%%GROUP%%", | ||
"port": "%%mlStagingPort%%", | ||
"modules-database": "%%mlModulesDbName%%", | ||
"content-database": "%%mlStagingDbName%%", | ||
"authentication": "%%mlStagingAuth%%", | ||
"default-error-format": "json", | ||
"error-handler": "/data-hub/4/rest-api/error-handler.xqy", | ||
"url-rewriter": "/data-hub/4/rest-api/rewriter.xml", | ||
"rewrite-resolves-globally": true | ||
} | ||
{ | ||
"server-name": "%%mlStagingAppserverName%%", | ||
"server-type": "http", | ||
"root": "/", | ||
"group-name": "%%GROUP%%", | ||
"port": "%%mlStagingPort%%", | ||
"modules-database": "%%mlModulesDbName%%", | ||
"content-database": "%%mlStagingDbName%%", | ||
"authentication": "%%mlStagingAuth%%", | ||
"default-error-format": "json", | ||
"error-handler": "/data-hub/5/rest-api/error-handler.xqy", | ||
"url-rewriter": "/data-hub/5/rest-api/rewriter.xml", | ||
"rewrite-resolves-globally": true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
marklogic-data-hub/src/main/java/com/marklogic/hub/job/JobMonitor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package com.marklogic.hub.job; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
/** | ||
* Monitor job / batch status | ||
*/ | ||
public interface JobMonitor { | ||
|
||
/** | ||
* Gets the current running jobs | ||
* @return A map of the current running job ids and their status | ||
*/ | ||
Map<String, String> getCurrentJobs(); | ||
|
||
/** | ||
* Sets the flow to be used with the flow runner | ||
* @param jobId the flow object to be used | ||
* @return string denoting the status of the running job, ex: "running step 1" | ||
*/ | ||
|
||
String getJobStatus(String jobId); | ||
|
||
/** | ||
* Sets the flow to be used with the flow runner | ||
* @param batchId the id of the batch | ||
* @return string denoting status of the batch | ||
* | ||
*/ | ||
|
||
String getBatchStatus(String jobId, String batchId); | ||
|
||
/** | ||
* Sets the flow to be used with the flow runner | ||
* @param jobId the id of the batch | ||
* @param step the step of the job | ||
* @return Map containing batch id and status of the batch | ||
* | ||
*/ | ||
|
||
Map<String,String> getStepBatchStatus(String jobId, String step); | ||
|
||
|
||
/** | ||
* Returns the response (uris or error msgs) of the batch | ||
* @param jobId the id of the job | ||
* @param batchId the id of the batch | ||
* @return the flow runner object | ||
*/ | ||
|
||
List<String> getBatchResponse(String jobId, String batchId); | ||
|
||
/** | ||
* The next step to be executed | ||
* @param jobId the id of the job | ||
* @return string denoting the next step of the job | ||
*/ | ||
|
||
String getNextStep(String jobId); | ||
} |
28 changes: 28 additions & 0 deletions
28
marklogic-data-hub/src/main/java/com/marklogic/hub/job/JobStatus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package com.marklogic.hub.job; | ||
|
||
public enum JobStatus { | ||
STARTED("started"), | ||
FINISHED("finished"), | ||
FINISHED_WITH_ERRORS("finished_with_errors"), | ||
RUNNING("running"), | ||
FAILED("failed"), | ||
CANCELED("canceled"); | ||
|
||
private String type; | ||
JobStatus(String type) { | ||
this.type = type; | ||
} | ||
|
||
public static JobStatus getJobStatus(String status) { | ||
for (JobStatus jobStatus : JobStatus.values()) { | ||
if (jobStatus.toString().equals(status)) { | ||
return jobStatus; | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
public String toString() { | ||
return this.type; | ||
} | ||
} |
186 changes: 186 additions & 0 deletions
186
marklogic-data-hub/src/main/java/com/marklogic/hub/job/impl/JobMonitorImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
/* | ||
* Copyright 2012-2019 MarkLogic Corporation | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.marklogic.hub.job.impl; | ||
|
||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.ObjectReader; | ||
import com.marklogic.client.DatabaseClient; | ||
import com.marklogic.client.extensions.ResourceManager; | ||
import com.marklogic.client.extensions.ResourceServices; | ||
import com.marklogic.client.io.JacksonHandle; | ||
import com.marklogic.client.util.RequestParameters; | ||
import com.marklogic.hub.HubConfig; | ||
import com.marklogic.hub.job.JobMonitor; | ||
import com.marklogic.hub.job.JobStatus; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
@Component | ||
public class JobMonitorImpl extends ResourceManager implements JobMonitor { | ||
|
||
private DatabaseClient client; | ||
@Autowired | ||
private HubConfig hubConfig; | ||
private Jobs jobs; | ||
private Batches batches; | ||
|
||
public JobMonitorImpl() { | ||
super(); | ||
} | ||
|
||
public void setupClient() { | ||
this.client = hubConfig.newJobDbClient(); | ||
jobs = new Jobs(client); | ||
batches = new Batches(client); | ||
} | ||
|
||
//obtain the currently running jobs on the cluster | ||
public Map<String, String> getCurrentJobs() { | ||
JsonNode runningJobs = jobs.getJobs(null, JobStatus.RUNNING); | ||
Map<String, String> jobs = new HashMap<>(); | ||
if (runningJobs.isArray()) { | ||
for (final JsonNode objNode : runningJobs) { | ||
JsonNode jobdoc = objNode.get("job"); | ||
jobs.put(jobdoc.get("jobId").textValue(), jobdoc.get("flow").textValue()); | ||
} | ||
} | ||
return jobs; | ||
} | ||
|
||
//obtain the status of any jobID | ||
public String getJobStatus(String jobId) { | ||
JsonNode job = jobs.getJobs(jobId, null); | ||
String status = null; | ||
if(job.get("job") != null){ | ||
status = job.get("job").get("jobStatus").textValue(); | ||
} | ||
return status; | ||
} | ||
|
||
|
||
//status of all batches in a step within a jobID | ||
public Map<String,String> getStepBatchStatus(String jobId, String step) { | ||
JsonNode batch = batches.getBatches(jobId, step, null); | ||
Map<String,String> status = new HashMap<>(); | ||
if (batch.isArray()) { | ||
for (final JsonNode objNode : batch) { | ||
status.put(objNode.get("batch").get("batchId").textValue(), objNode.get("batch").get("batchStatus").textValue()); | ||
} | ||
} | ||
return status; | ||
} | ||
|
||
//status of a single batch | ||
public String getBatchStatus(String jobId, String batchId) { | ||
JsonNode batch = batches.getBatches(jobId, null, batchId); | ||
String status = null; | ||
if (batch.get("batch") != null) { | ||
status = batch.get("batch").get("batchStatus").textValue(); | ||
} | ||
return status; | ||
} | ||
|
||
//response of a single batch | ||
public List<String> getBatchResponse(String jobId, String batchId) { | ||
JsonNode batch = batches.getBatches(jobId, null, batchId); | ||
ObjectMapper mapper = new ObjectMapper(); | ||
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {}); | ||
if(batch.get("batch") != null) { | ||
try { | ||
return reader.readValue(batch.get("batch").get("uris")); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
} | ||
return null; | ||
} | ||
|
||
//TODO Implement this after flow artifact can be created | ||
public String getNextStep(String jobId) { | ||
return null; | ||
} | ||
|
||
public class Jobs extends ResourceManager { | ||
private static final String NAME = "ml:jobs"; | ||
|
||
private RequestParameters params; | ||
|
||
public Jobs(DatabaseClient client) { | ||
super(); | ||
client.init(NAME, this); | ||
} | ||
|
||
private JsonNode getJobs(String jobId, JobStatus status) { | ||
params = new RequestParameters(); | ||
if(jobId != null) { | ||
params.add("jobid", jobId); | ||
} | ||
if(status != null) { | ||
params.add("status", status.toString()); | ||
} | ||
|
||
ResourceServices.ServiceResultIterator resultItr = this.getServices().get(params); | ||
if (resultItr == null || ! resultItr.hasNext()) { | ||
throw new RuntimeException("Unable to get job document"); | ||
} | ||
ResourceServices.ServiceResult res = resultItr.next(); | ||
return res.getContent(new JacksonHandle()).get(); | ||
} | ||
|
||
} | ||
|
||
public class Batches extends ResourceManager { | ||
private static final String NAME = "ml:batches"; | ||
|
||
private RequestParameters params ; | ||
|
||
public Batches(DatabaseClient client) { | ||
super(); | ||
client.init(NAME, this); | ||
} | ||
|
||
private JsonNode getBatches(String jobId, String step, String batchId) { | ||
params = new RequestParameters(); | ||
if(jobId == null) { | ||
throw new RuntimeException("Cannot get batches without jobId"); | ||
} | ||
params.add("jobid", jobId); | ||
if(batchId != null) { | ||
params.add("batchid", batchId); | ||
} | ||
if(step != null) { | ||
params.add("step", step); | ||
} | ||
|
||
ResourceServices.ServiceResultIterator resultItr = this.getServices().get(params); | ||
if (resultItr == null || ! resultItr.hasNext()) { | ||
throw new RuntimeException("Unable to get batch document"); | ||
} | ||
ResourceServices.ServiceResult res = resultItr.next(); | ||
return res.getContent(new JacksonHandle()).get(); | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.