Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export jobs #718

Merged
merged 13 commits into from
Feb 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public FlowManager(HubConfig hubConfig) {
this.stagingClient = hubConfig.newStagingClient();
this.finalClient = hubConfig.newFinalClient();
this.jobClient = hubConfig.newJobDbClient();
this.jobManager = new JobManager(this.jobClient);
this.jobManager = new JobManager(this.jobClient, this.hubConfig.newTraceDbClient());
this.dataMovementManager = this.stagingClient.newDataMovementManager();
this.stagingClient.init(NAME, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedExcep
@Override
public JobTicket run() {
String jobId = UUID.randomUUID().toString();
JobManager jobManager = new JobManager(hubConfig.newJobDbClient());
JobManager jobManager = new JobManager(hubConfig.newJobDbClient(), hubConfig.newTraceDbClient());

Job job = Job.withFlow(flow)
.withJobId(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,37 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.Transaction;
import com.marklogic.client.datamovement.*;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.ext.datamovement.consumer.WriteToZipConsumer;
import com.marklogic.client.extensions.ResourceManager;
import com.marklogic.client.extensions.ResourceServices;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.JacksonDatabindHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.query.*;
import com.marklogic.client.util.RequestParameters;

import javax.xml.namespace.QName;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Scanner;
import java.util.TimeZone;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

public class JobManager {

private DatabaseClient traceClient;
private DatabaseClient jobClient;
private JSONDocumentManager docMgr;
private JobDeleteResource jobDeleteRunner = null;

Expand All @@ -62,7 +76,9 @@ public class JobManager {
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setDateFormat(simpleDateFormat8601);

public JobManager(DatabaseClient jobClient) {
public JobManager(DatabaseClient jobClient, DatabaseClient traceClient) {
this.jobClient = jobClient;
this.traceClient = traceClient;
this.docMgr = jobClient.newJSONDocumentManager();
this.jobDeleteRunner = new JobDeleteResource(jobClient);
}
Expand All @@ -85,6 +101,141 @@ public JobDeleteResponse deleteJobs(String jobIds) {
return this.jobDeleteRunner.deleteJobs(jobIds);
}

/**
* Export Job documents and their associated Trace documents to a zip file.
*
* @param exportFilePath specifies where the zip file will be written
* @param jobIds a comma-separated list of jobIds; if null, all will be exported
*/
public void exportJobs(Path exportFilePath, String jobIds) {
File zipFile = exportFilePath.toFile();
WriteToZipConsumer zipConsumer = new WriteToZipConsumer(zipFile);

String[] jobsArray = jobIds != null ? jobIds.split(",") : null;

QueryManager qm = jobClient.newQueryManager();

// Build a query that will match everything
StringQueryDefinition emptyQuery = qm.newStringDefinition();
emptyQuery.setCriteria("");

// Get the job(s) document(s)
StructuredQueryBuilder sqb = qm.newStructuredQueryBuilder();
DataMovementManager dmm = jobClient.newDataMovementManager();
QueryBatcher batcher = null;
StructuredQueryDefinition query = null;
if (jobsArray == null) {
batcher = dmm.newQueryBatcher(emptyQuery);
}
else {
batcher = dmm.newQueryBatcher(sqb.value(sqb.jsonProperty("jobId"), jobsArray));
}
batcher.onUrisReady(new ExportListener().onDocumentReady(zipConsumer));
JobTicket jobTicket = dmm.startJob(batcher);

batcher.awaitCompletion();
dmm.stopJob(batcher);
dmm.release();

JobReport report = dmm.getJobReport(jobTicket);
long jobCount = report.getSuccessEventsCount();

if (jobCount > 0) {

// Get the traces that go with the job(s)
dmm = this.traceClient.newDataMovementManager();
if (jobsArray == null) {
batcher = dmm.newQueryBatcher(emptyQuery);
}
else {
batcher = dmm.newQueryBatcher(sqb.value(sqb.element(new QName("jobId")), jobsArray));
}
batcher.onUrisReady(new ExportListener().onDocumentReady(zipConsumer));
dmm.startJob(batcher);

batcher.awaitCompletion();
dmm.stopJob(batcher);
dmm.release();

zipConsumer.close();
}
else {
// there were no jobs, so don't produce an empty zip file
zipConsumer.close();
zipFile.delete();
}

}

/**
* Import Job documents and their associated Trace documents from a zip file.
*
* @param importFilePath specifies where the zip file exists
*/
public void importJobs(Path importFilePath) throws IOException {
ZipFile importZip = new ZipFile(importFilePath.toFile());
Enumeration<? extends ZipEntry> entries = importZip.entries();

DataMovementManager dmm = jobClient.newDataMovementManager();
WriteBatcher writer = dmm
.newWriteBatcher()
.withJobName("Load jobs")
.withBatchSize(50);
JobTicket ticket = dmm.startJob(writer);

// Add each Job entry to the writer; set aside the Trace entries.
ArrayList<ZipEntry> traceEntries = new ArrayList<ZipEntry>();
DocumentMetadataHandle jobMetadata = new DocumentMetadataHandle().withCollections("job");
while (entries.hasMoreElements()) {
ZipEntry entry = entries.nextElement();

if (entry.getName().startsWith("/jobs/")) {
// Delimiter = \A, which is the beginning of the input
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
String entryText = s.hasNext() ? s.next() : "";

writer.add(
entry.getName(),
jobMetadata,
new StringHandle(entryText).withFormat(Format.JSON)
);
}
else {
traceEntries.add(entry);
}
}

writer.flushAndWait();
dmm.stopJob(ticket);
dmm.release();

if (traceEntries.size() > 0) {
dmm = this.traceClient.newDataMovementManager();
writer = dmm
.newWriteBatcher()
.withJobName("Load traces");
ticket = dmm.startJob(writer);

DocumentMetadataHandle traceMetadata = new DocumentMetadataHandle().withCollections("trace");

for (ZipEntry entry: traceEntries) {
// Delimiter = \A, which is the beginning of the input
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
String entryText = s.hasNext() ? s.next() : "";

writer.add(
entry.getName(),
traceMetadata,
new StringHandle(entryText)
.withFormat(entry.getName().endsWith(".json") ? Format.JSON : Format.XML)
);
}
writer.flushAndWait();
dmm.stopJob(ticket);
dmm.release();
}
}

public class JobDeleteResource extends ResourceManager {
private static final String DELETE_SERVICE = "delete-jobs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MlcpRunner(String mlcpPath, String mainClass, HubConfig hubConfig, Flow f

this.withHubconfig(hubConfig);

this.jobManager = new JobManager(hubConfig.newJobDbClient());
this.jobManager = new JobManager(hubConfig.newJobDbClient(), hubConfig.newTraceDbClient());
this.flowStatusListener = statusListener;
this.flow = flow;
this.mlcpOptions = mlcpOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,18 @@ declare %private function trace:write-error-trace(
declare option xdmp:mapping "false";

declare variable $trace external;
declare variable $extension external;

xdmp:document-insert(
"/" || $trace/*:trace/*:traceId,
"/" || $trace/*:trace/*:traceId || $extension,
$trace,
xdmp:default-permissions(),
("trace", $trace/*:trace/*:type)
)
',
map:new((
map:entry("trace", $trace)
map:entry("trace", $trace),
map:entry("extension", if (rfc:is-json()) then ".json" else ".xml")
)),
map:new((
map:entry("database", xdmp:database($config:TRACE-DATABASE)),
Expand Down
Loading