From 6a15aad2084941d2214a2736a19276e76d4a2fc3 Mon Sep 17 00:00:00 2001 From: ssambasu Date: Wed, 23 Jan 2019 16:47:52 -0800 Subject: [PATCH 1/3] Refactoring the 4.x FlowManager as LegacyFlowManager --- .../com/marklogic/hub/ApplicationConfig.java | 2 +- .../commands/LoadUserModulesCommand.java | 4 +- .../com/marklogic/hub/impl/DataHubImpl.java | 3 +- .../hub/legacy/LegacyFlowManager.java | 115 +++++++ .../legacy/impl/LegacyFlowManagerImpl.java | 292 ++++++++++++++++++ .../java/com/marklogic/hub/HubTestBase.java | 3 +- .../test/java/com/marklogic/hub/PiiE2E.java | 3 +- .../hub/collector/EmptyCollectorTest.java | 1 - .../com/marklogic/hub/job/JobManagerTest.java | 1 - .../hub/legacy/flow/FlowManagerTest.java | 6 +- .../hub_integration/EndToEndFlowTests.java | 5 +- .../marklogic/hub_integration/MappingE2E.java | 5 +- .../hub_integration/StreamCollectorTest.java | 5 +- .../com/marklogic/gradle/DataHubPlugin.groovy | 5 +- .../com/marklogic/gradle/task/HubTask.groovy | 3 +- .../marklogic/gradle/task/RunFlowTask.groovy | 4 +- .../service/FlowManagerService.java | 6 +- .../service/FlowManagerServiceTest.java | 5 +- 18 files changed, 437 insertions(+), 31 deletions(-) create mode 100644 marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/LegacyFlowManager.java create mode 100644 marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/impl/LegacyFlowManagerImpl.java diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/ApplicationConfig.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/ApplicationConfig.java index 0237b67652..ba348b119b 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/ApplicationConfig.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/ApplicationConfig.java @@ -11,7 +11,7 @@ * This application configuration is an entry point to using the DHF from a set property */ @Configuration -@ComponentScan(basePackages = {"com.marklogic.hub.impl", "com.marklogic.hub.deploy.commands"}) +@ComponentScan(basePackages = {"com.marklogic.hub.impl", "com.marklogic.hub.legacy.impl", "com.marklogic.hub.deploy.commands"}) @EnableAutoConfiguration public class ApplicationConfig { diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/deploy/commands/LoadUserModulesCommand.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/deploy/commands/LoadUserModulesCommand.java index 1bb63dc6db..a0c4639a77 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/deploy/commands/LoadUserModulesCommand.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/deploy/commands/LoadUserModulesCommand.java @@ -30,7 +30,7 @@ import com.marklogic.client.io.Format; import com.marklogic.client.io.StringHandle; import com.marklogic.hub.EntityManager; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; import com.marklogic.hub.deploy.util.HubFileFilter; import com.marklogic.hub.error.LegacyFlowsException; @@ -63,7 +63,7 @@ public class LoadUserModulesCommand extends LoadModulesCommand { private EntityManager entityManager; @Autowired - private FlowManager flowManager; + private LegacyFlowManager flowManager; private DocumentPermissionsParser documentPermissionsParser = new DefaultDocumentPermissionsParser(); private ThreadPoolTaskExecutor threadPoolTaskExecutor; diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/DataHubImpl.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/DataHubImpl.java index 3a880be9fe..7078fb8b58 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/DataHubImpl.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/DataHubImpl.java @@ -44,6 +44,7 @@ import com.marklogic.hub.deploy.commands.*; import com.marklogic.hub.deploy.util.HubDeployStatusListener; import com.marklogic.hub.error.*; +import com.marklogic.hub.legacy.impl.LegacyFlowManagerImpl; import com.marklogic.mgmt.ManageClient; import com.marklogic.mgmt.admin.AdminManager; import com.marklogic.mgmt.resource.appservers.ServerManager; @@ -100,7 +101,7 @@ public class DataHubImpl implements DataHub { private Versions versions; @Autowired - private FlowManagerImpl flowManager; + private LegacyFlowManagerImpl flowManager; private AdminManager _adminManager; diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/LegacyFlowManager.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/LegacyFlowManager.java new file mode 100644 index 0000000000..5a2d409e3c --- /dev/null +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/LegacyFlowManager.java @@ -0,0 +1,115 @@ +/* + * Copyright 2012-2019 MarkLogic Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.marklogic.hub.legacy; + +import com.marklogic.hub.legacy.flow.Flow; +import com.marklogic.hub.legacy.flow.FlowRunner; +import com.marklogic.hub.legacy.flow.FlowType; +import com.marklogic.hub.legacy.flow.impl.FlowImpl; +import org.w3c.dom.Element; + +import java.nio.file.Path; +import java.util.List; + +/** + * Manages existing flows and creates flow runners to execute flows. + */ +public interface LegacyFlowManager { + + /** + * Turns an XML document into a flow + * @param doc - the xml document representing a flow + * @return a Flow instance + */ + static Flow flowFromXml(Element doc) { + return FlowImpl.fromXml(doc); + } + + /** + * retrieves a list of all the flows on the local files systems + * @return a list of Flows + */ + List getLocalFlows(); + + /** + * retrieves a list of all the flows on the local files systems + * @param entityName - string name of the entity for the flow + * @return a list of Flows + */ + List getLocalFlowsForEntity(String entityName); + + /** + * retrieves a list of all the flows on the local files systems + * @param entityName - string name of the entity for the flow + * @param flowType - the FlowType enum, eg: ingest or harmonize + * @return a list of Flows + */ + List getLocalFlowsForEntity(String entityName, FlowType flowType); + + /** + * Obtains a flow from a property file + * @param propertiesFile - the Path to the property file + * @return - a flow object + */ + Flow getFlowFromProperties(Path propertiesFile); + + /** + * Retrieves a list of flows installed on the MarkLogic server + * @param entityName - the entity from which to fetch the flows + * @return - a list of flows for the given entity + */ + List getFlows(String entityName); + + /** + * Retrieves a named flow from a given entity + * + * @param entityName - the entity that the flow belongs to + * @param flowName - the name of the flow to get + * @return the flow + */ + Flow getFlow(String entityName, String flowName); + + /** + * Retrieves a named flow from a given entity + * + * @param entityName - the entity that the flow belongs to + * @param flowName - the name of the flow to get + * @param flowType - the type of flow (ingest/harmonize) + * @return the flow + */ + Flow getFlow(String entityName, String flowName, FlowType flowType); + + /** + * Updates the indexes in the database based on the project + * @return - a list of names for all the flows that are legacy + */ + + List getLegacyFlows(); + + /** + * Sets the version that the legacy flow is to be updated from + * @param fromVersion - string representation of DHF version + * @return a list of updated flow names that were updated + */ + List updateLegacyFlows(String fromVersion); + + /** + * Creates and returns a new FlowRunner object using the FlowManager's hubconfig + * @return FlowRunner object with current hubconfig already set + */ + FlowRunner newFlowRunner(); +} diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/impl/LegacyFlowManagerImpl.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/impl/LegacyFlowManagerImpl.java new file mode 100644 index 0000000000..bcdc5dbdb7 --- /dev/null +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/impl/LegacyFlowManagerImpl.java @@ -0,0 +1,292 @@ +/* + * Copyright 2012-2019 MarkLogic Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.marklogic.hub.legacy.impl; + +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.extensions.ResourceManager; +import com.marklogic.client.extensions.ResourceServices.ServiceResult; +import com.marklogic.client.extensions.ResourceServices.ServiceResultIterator; +import com.marklogic.client.io.DOMHandle; +import com.marklogic.client.util.RequestParameters; +import com.marklogic.hub.legacy.LegacyFlowManager; +import com.marklogic.hub.HubConfig; +import com.marklogic.hub.collector.impl.CollectorImpl; +import com.marklogic.hub.legacy.flow.*; +import com.marklogic.hub.legacy.flow.impl.FlowRunnerImpl; +import com.marklogic.hub.main.impl.MainPluginImpl; +import com.marklogic.hub.scaffold.Scaffolding; +import org.apache.commons.io.FileUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; + +@Component +public class LegacyFlowManagerImpl extends ResourceManager implements LegacyFlowManager { + + private static final String NAME = "ml:flow"; + + private DatabaseClient stagingClient; + + + @Autowired + private HubConfig hubConfig; + + @Autowired + private Scaffolding scaffolding; + + public LegacyFlowManagerImpl() { + super(); + } + + + public void setupClient() { + this.stagingClient = hubConfig.newStagingClient(); + this.stagingClient.init(NAME, this); + } + + @Override public List getLocalFlows() { + List flows = new ArrayList<>(); + + Path entitiesDir = hubConfig.getHubEntitiesDir(); + File[] entities = entitiesDir.toFile().listFiles((pathname -> pathname.isDirectory())); + if (entities != null) { + for (File entity : entities) { + String entityName = entity.getName(); + flows.addAll(getLocalFlowsForEntity(entityName)); + } + } + return flows; + } + + @Override public List getLocalFlowsForEntity(String entityName) { + return getLocalFlowsForEntity(entityName, null); + } + + @Override public List getLocalFlowsForEntity(String entityName, FlowType flowType) { + + List flows = new ArrayList<>(); + Path entitiesDir = hubConfig.getHubEntitiesDir(); + Path entityDir = entitiesDir.resolve(entityName); + Path inputDir = entityDir.resolve("input"); + Path harmonizeDir = entityDir.resolve("harmonize"); + boolean getInputFlows = false; + boolean getHarmonizeFlows = false; + if (flowType == null) { + getInputFlows = getHarmonizeFlows = true; + } + else if (flowType.equals(FlowType.INPUT)) { + getInputFlows = true; + } + else if (flowType.equals(FlowType.HARMONIZE)) { + getHarmonizeFlows = true; + } + + if (getInputFlows) { + File[] inputFlows = inputDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); + if (inputFlows != null) { + for (File inputFlow : inputFlows) { + Flow flow = getLocalFlow(entityName, inputFlow.toPath(), FlowType.INPUT); + if (flow != null) { + flows.add(flow); + } + } + } + } + + if (getHarmonizeFlows) { + File[] harmonizeFlows = harmonizeDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); + if (harmonizeFlows != null) { + for (File harmonizeFlow : harmonizeFlows) { + Flow flow = getLocalFlow(entityName, harmonizeFlow.toPath(), FlowType.HARMONIZE); + if (flow != null) { + flows.add(flow); + } + + } + } + } + return flows; + } + + @Override public Flow getFlowFromProperties(Path propertiesFile) { + String quotedSeparator = Pattern.quote(File.separator); + /* Extract flowName and entityName from ..../plugins/entities// + * input|harmonize//flowName.properties + */ + String floweRegex = ".+" + "plugins" + quotedSeparator + "entities" + quotedSeparator + "(.+)"+ quotedSeparator + +"(input|harmonize)" + quotedSeparator + "(.+)" + quotedSeparator + ".+"; + FlowType flowType = propertiesFile.toString().replaceAll(floweRegex, "$2").equals("input") + ? FlowType.INPUT : FlowType.HARMONIZE; + + String entityName = propertiesFile.toString().replaceAll(floweRegex, "$1"); + return getLocalFlow(entityName, propertiesFile.getParent(), flowType); + } + + private Flow getLocalFlow(String entityName, Path flowDir, FlowType flowType) { + try { + String flowName = flowDir.getFileName().toString(); + File propertiesFile = flowDir.resolve(flowName + ".properties").toFile(); + if (propertiesFile.exists()) { + Properties properties = new Properties(); + FileInputStream fis = new FileInputStream(propertiesFile); + properties.load(fis); + + // trim trailing whitespaces for properties. + for (String key : properties.stringPropertyNames()){ + properties.put(key, properties.get(key).toString().trim()); + } + fis.close(); + + FlowBuilder flowBuilder = FlowBuilder.newFlow() + .withEntityName(entityName) + .withName(flowName) + .withType(flowType) + .withCodeFormat(CodeFormat.getCodeFormat((String) properties.get("codeFormat"))) + .withDataFormat(DataFormat.getDataFormat((String) properties.get("dataFormat"))) + .withMain(new MainPluginImpl((String) properties.get("mainModule"), CodeFormat.getCodeFormat((String) properties.get("mainCodeFormat")))); + + if (flowType.equals(FlowType.HARMONIZE)) { + flowBuilder.withCollector(new CollectorImpl((String) properties.get("collectorModule"), CodeFormat.getCodeFormat((String) properties.get("collectorCodeFormat")))); + } + + return flowBuilder.build(); + } + } + catch(Exception e) { + e.printStackTrace(); + } + return null; + } + + @Override public List getFlows(String entityName) { + RequestParameters params = new RequestParameters(); + params.add("entity-name", entityName); + ServiceResultIterator resultItr = this.getServices().get(params); + if (resultItr == null || ! resultItr.hasNext()) { + return null; + } + ServiceResult res = resultItr.next(); + DOMHandle handle = new DOMHandle(); + Document parent = res.getContent(handle).get(); + NodeList children = parent.getDocumentElement().getChildNodes(); + + ArrayList flows = null; + if (children.getLength() > 0) { + flows = new ArrayList<>(); + } + + Node node; + for (int i = 0; i < children.getLength(); i++) { + node = children.item(i); + if (node.getNodeType() == Node.ELEMENT_NODE) { + flows.add(LegacyFlowManager.flowFromXml((Element)children.item(i))); + } + } + return flows; + } + + @Override public Flow getFlow(String entityName, String flowName) { + return getFlow(entityName, flowName, null); + } + + @Override public Flow getFlow(String entityName, String flowName, FlowType flowType) { + RequestParameters params = new RequestParameters(); + params.add("entity-name", entityName); + params.add("flow-name", flowName); + if (flowType != null) { + params.add("flow-type", flowType.toString()); + } + ServiceResultIterator resultItr = this.getServices().get(params); + if (resultItr == null || ! resultItr.hasNext()) { + return null; + } + ServiceResult res = resultItr.next(); + DOMHandle handle = new DOMHandle(); + Document parent = res.getContent(handle).get(); + return LegacyFlowManager.flowFromXml(parent.getDocumentElement()); + } + + @Override public List getLegacyFlows() { + List oldFlows = new ArrayList<>(); + Path entitiesDir = hubConfig.getHubEntitiesDir(); + + File[] entityDirs = entitiesDir.toFile().listFiles(pathname -> pathname.isDirectory()); + if (entityDirs != null) { + for (File entityDir : entityDirs) { + Path inputDir = entityDir.toPath().resolve("input"); + Path harmonizeDir = entityDir.toPath().resolve("harmonize"); + + File[] inputFlows = inputDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); + addLegacyFlowToList(oldFlows, entityDir, inputFlows); + + File[] harmonizeFlows = harmonizeDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); + addLegacyFlowToList(oldFlows, entityDir, harmonizeFlows); + } + } + + return oldFlows; + } + + private void addLegacyFlowToList(List oldFlows, File entityDir, File[] flows) { + if (flows != null) { + for (File flow : flows) { + File[] mainFiles = flow.listFiles((dir, name) -> name.matches("main\\.(sjs|xqy)")); + File[] flowFiles = flow.listFiles((dir, name) -> name.matches(flow.getName() + "\\.xml")); + if (mainFiles.length < 1 && flowFiles.length == 1) { + oldFlows.add(entityDir.getName() + " => " + flow.getName()); + } else if (mainFiles.length == 1 && mainFiles[0].getName().contains(".sjs")) { + try { + String mainFile = FileUtils.readFileToString(mainFiles[0]); + if (mainFile.contains("dhf.xqy")) { + oldFlows.add(entityDir.getName() + " => " + flow.getName()); + } + } + catch(IOException e) {} + } + } + } + } + + @Override public List updateLegacyFlows(String fromVersion) { + + List updatedFlows = new ArrayList<>(); + File[] entityDirs = hubConfig.getHubEntitiesDir().toFile().listFiles(pathname -> pathname.isDirectory()); + if (entityDirs != null) { + for (File entityDir : entityDirs) { + updatedFlows.addAll(scaffolding.updateLegacyFlows(fromVersion, entityDir.getName())); + } + } + + return updatedFlows; + } + + @Override public FlowRunner newFlowRunner() { + return new FlowRunnerImpl(hubConfig); + } + +} diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/HubTestBase.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/HubTestBase.java index 9c18f082a4..3f6a4b74f3 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub/HubTestBase.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/HubTestBase.java @@ -44,6 +44,7 @@ import com.marklogic.hub.legacy.flow.DataFormat; import com.marklogic.hub.legacy.flow.FlowType; import com.marklogic.hub.impl.*; +import com.marklogic.hub.legacy.impl.LegacyFlowManagerImpl; import com.marklogic.hub.scaffold.Scaffolding; import com.marklogic.hub.util.ComboListener; import com.marklogic.mgmt.ManageClient; @@ -132,7 +133,7 @@ public class HubTestBase { protected MappingManager mappingManager; @Autowired - protected FlowManagerImpl fm; + protected LegacyFlowManagerImpl fm; // to speedup dev cycle, you can create a hub and set this to true. // for true setup/teardown, must be 'false' diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/PiiE2E.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/PiiE2E.java index de1597ac4d..82580aad05 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub/PiiE2E.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/PiiE2E.java @@ -21,6 +21,7 @@ import com.marklogic.client.document.ServerTransform; import com.marklogic.client.io.DocumentMetadataHandle; import com.marklogic.client.io.FileHandle; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.legacy.flow.Flow; import com.marklogic.hub.legacy.flow.FlowRunner; import com.marklogic.hub.legacy.flow.FlowType; @@ -68,7 +69,7 @@ public class PiiE2E extends HubTestBase protected EntityManager entityManager; @Autowired - protected FlowManager flowManager; + protected LegacyFlowManager flowManager; @Autowired private Scaffolding scaffolding; diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/collector/EmptyCollectorTest.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/collector/EmptyCollectorTest.java index 749e37330d..9e2b77ce88 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub/collector/EmptyCollectorTest.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/collector/EmptyCollectorTest.java @@ -21,7 +21,6 @@ import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import org.custommonkey.xmlunit.XMLUnit; import org.junit.jupiter.api.BeforeEach; diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/job/JobManagerTest.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/job/JobManagerTest.java index 86142f5360..5a53e0d460 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub/job/JobManagerTest.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/job/JobManagerTest.java @@ -19,7 +19,6 @@ import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import org.custommonkey.xmlunit.XMLUnit; import org.junit.jupiter.api.AfterEach; diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/legacy/flow/FlowManagerTest.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/legacy/flow/FlowManagerTest.java index b1cbf11101..e6937593d6 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub/legacy/flow/FlowManagerTest.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/legacy/flow/FlowManagerTest.java @@ -18,7 +18,7 @@ import com.marklogic.bootstrap.Installer; import com.marklogic.client.io.DOMHandle; import com.marklogic.client.io.DocumentMetadataHandle; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; import com.marklogic.hub.collector.Collector; @@ -55,7 +55,7 @@ public class FlowManagerTest extends HubTestBase { @Autowired - private FlowManager fm; + private LegacyFlowManager fm; @Autowired private Scaffolding scaffolding; @@ -146,7 +146,7 @@ private void addFinalDocs() { public void testFlowFromXml() { Document d = getXmlFromResource("flow-manager-test/simple-flow.xml"); - Flow flow = FlowManager.flowFromXml(d.getDocumentElement()); + Flow flow = LegacyFlowManager.flowFromXml(d.getDocumentElement()); assertEquals(flow.getName(), "my-test-flow"); assertEquals(flow.getCollector().getCodeFormat(), CodeFormat.XQUERY); assertEquals(flow.getCollector().getModule(), "/entities/test/harmonize/my-test-flow/collector.xqy"); diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/EndToEndFlowTests.java b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/EndToEndFlowTests.java index 328000a26a..6ac9723311 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/EndToEndFlowTests.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/EndToEndFlowTests.java @@ -26,10 +26,9 @@ import com.marklogic.client.document.ServerTransform; import com.marklogic.client.io.*; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import com.marklogic.hub.scaffold.Scaffolding; import com.marklogic.hub.util.FileUtil; @@ -113,7 +112,7 @@ public class EndToEndFlowTests extends HubTestBase { private static final int TEST_SIZE = 500; private static final int BATCH_SIZE = 10; @Autowired - private FlowManager flowManager; + private LegacyFlowManager flowManager; private DataMovementManager flowRunnerDataMovementManager; private boolean installDocsFinished = false; diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/MappingE2E.java b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/MappingE2E.java index 66ca2cd66d..70f45a28cf 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/MappingE2E.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/MappingE2E.java @@ -29,11 +29,10 @@ import com.marklogic.client.io.JacksonHandle; import com.marklogic.client.io.StringHandle; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; import com.marklogic.hub.MappingManager; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import com.marklogic.hub.mapping.Mapping; import com.marklogic.hub.scaffold.Scaffolding; @@ -68,7 +67,7 @@ public class MappingE2E extends HubTestBase { private static final int TEST_SIZE = 20; private static final int BATCH_SIZE = 10; @Autowired - private FlowManager flowManager; + private LegacyFlowManager flowManager; private DataMovementManager stagingDataMovementManager; private boolean installDocsFinished = false; private boolean installDocsFailed = false; diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/StreamCollectorTest.java b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/StreamCollectorTest.java index b2697e28c2..c7c04950b6 100644 --- a/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/StreamCollectorTest.java +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub_integration/StreamCollectorTest.java @@ -24,10 +24,9 @@ import com.marklogic.client.io.JacksonHandle; import com.marklogic.client.io.StringHandle; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; import com.marklogic.hub.HubTestBase; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import com.marklogic.hub.scaffold.Scaffolding; import com.marklogic.hub.util.FileUtil; @@ -68,7 +67,7 @@ public class StreamCollectorTest extends HubTestBase { private String installDocError; @Autowired - private FlowManager fm; + private LegacyFlowManager fm; @Autowired private Scaffolding scaffolding; diff --git a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/DataHubPlugin.groovy b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/DataHubPlugin.groovy index 1f8546eb8c..9900754eb9 100644 --- a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/DataHubPlugin.groovy +++ b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/DataHubPlugin.groovy @@ -27,6 +27,7 @@ import com.marklogic.hub.deploy.commands.LoadHubModulesCommand import com.marklogic.hub.deploy.commands.LoadUserModulesCommand import com.marklogic.hub.deploy.commands.LoadUserArtifactsCommand import com.marklogic.hub.impl.* +import com.marklogic.hub.legacy.impl.LegacyFlowManagerImpl import org.gradle.api.GradleException import org.gradle.api.Plugin import org.gradle.api.Project @@ -46,7 +47,7 @@ class DataHubPlugin implements Plugin { private LoadUserModulesCommand loadUserModulesCommand private LoadUserArtifactsCommand loadUserArtifactsCommand private MappingManagerImpl mappingManager - private FlowManagerImpl flowManager + private LegacyFlowManagerImpl flowManager private EntityManagerImpl entityManager private GeneratePiiCommand generatePiiCommand @@ -162,7 +163,7 @@ class DataHubPlugin implements Plugin { loadUserModulesCommand = ctx.getBean(LoadUserModulesCommand.class) loadUserArtifactsCommand = ctx.getBean(LoadUserArtifactsCommand.class) mappingManager = ctx.getBean(MappingManagerImpl.class) - flowManager = ctx.getBean(FlowManagerImpl.class) + flowManager = ctx.getBean(LegacyFlowManagerImpl.class) entityManager = ctx.getBean(EntityManagerImpl.class) generatePiiCommand = ctx.getBean(GeneratePiiCommand.class) diff --git a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/HubTask.groovy b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/HubTask.groovy index c1ddffcc1c..d14c699e5b 100644 --- a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/HubTask.groovy +++ b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/HubTask.groovy @@ -27,6 +27,7 @@ import com.marklogic.hub.deploy.commands.LoadHubModulesCommand import com.marklogic.hub.deploy.commands.LoadUserArtifactsCommand import com.marklogic.hub.deploy.commands.LoadUserModulesCommand import com.marklogic.hub.job.JobManager +import com.marklogic.hub.legacy.LegacyFlowManager import com.marklogic.hub.scaffold.Scaffolding import org.gradle.api.DefaultTask import org.gradle.api.tasks.Internal @@ -94,7 +95,7 @@ abstract class HubTask extends DefaultTask { } @Internal - FlowManager getFlowManager() { + LegacyFlowManager getFlowManager() { getProject().property("flowManager") } diff --git a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/RunFlowTask.groovy b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/RunFlowTask.groovy index 854531f65c..a4608f1f4a 100644 --- a/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/RunFlowTask.groovy +++ b/ml-data-hub-plugin/src/main/groovy/com/marklogic/gradle/task/RunFlowTask.groovy @@ -24,7 +24,7 @@ import com.marklogic.gradle.exception.EntityNameRequiredException import com.marklogic.gradle.exception.FlowNameRequiredException import com.marklogic.gradle.exception.FlowNotFoundException import com.marklogic.gradle.exception.HubNotInstalledException -import com.marklogic.hub.FlowManager +import com.marklogic.hub.legacy.LegacyFlowManager import com.marklogic.hub.legacy.flow.Flow import com.marklogic.hub.legacy.flow.FlowItemCompleteListener import com.marklogic.hub.legacy.flow.FlowItemFailureListener @@ -111,7 +111,7 @@ class RunFlowTask extends HubTask { throw new HubNotInstalledException() } - FlowManager fm = getFlowManager() + LegacyFlowManager fm = getFlowManager() Flow flow = fm.getFlow(entityName, flowName, FlowType.HARMONIZE) if (flow == null) { throw new FlowNotFoundException(entityName, flowName); diff --git a/quick-start/src/main/java/com/marklogic/quickstart/service/FlowManagerService.java b/quick-start/src/main/java/com/marklogic/quickstart/service/FlowManagerService.java index 061b676724..507e0ece02 100644 --- a/quick-start/src/main/java/com/marklogic/quickstart/service/FlowManagerService.java +++ b/quick-start/src/main/java/com/marklogic/quickstart/service/FlowManagerService.java @@ -19,7 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.marklogic.client.datamovement.JobTicket; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.legacy.flow.Flow; import com.marklogic.hub.legacy.flow.FlowRunner; import com.marklogic.hub.legacy.flow.FlowStatusListener; @@ -49,7 +49,7 @@ public class FlowManagerService { private static final String PROJECT_TMP_FOLDER = ".tmp"; @Autowired - private FlowManager flowManager; + private LegacyFlowManager flowManager; @Autowired private HubConfigImpl hubConfig; @@ -179,7 +179,7 @@ public void runMlcp(Flow flow, JsonNode json, FlowStatusListener statusListener) runner.start(); } - public FlowManager getFlowManager() { + public LegacyFlowManager getFlowManager() { return flowManager; } } diff --git a/quick-start/src/test/java/com/marklogic/quickstart/service/FlowManagerServiceTest.java b/quick-start/src/test/java/com/marklogic/quickstart/service/FlowManagerServiceTest.java index 3fdf12760a..41d03d4840 100644 --- a/quick-start/src/test/java/com/marklogic/quickstart/service/FlowManagerServiceTest.java +++ b/quick-start/src/test/java/com/marklogic/quickstart/service/FlowManagerServiceTest.java @@ -25,9 +25,8 @@ import com.marklogic.client.io.JacksonHandle; import com.marklogic.client.io.StringHandle; import com.marklogic.hub.ApplicationConfig; -import com.marklogic.hub.FlowManager; +import com.marklogic.hub.legacy.LegacyFlowManager; import com.marklogic.hub.HubConfig; -import com.marklogic.hub.flow.*; import com.marklogic.hub.legacy.flow.*; import com.marklogic.hub.scaffold.Scaffolding; import com.marklogic.hub.util.FileUtil; @@ -65,7 +64,7 @@ public class FlowManagerServiceTest extends AbstractServiceTest { FlowManagerService fm; @Autowired - FlowManager flowManager; + LegacyFlowManager flowManager; @Autowired Scaffolding scaffolding; From 6d6df1bfb517f200688b58411c5217035318b07f Mon Sep 17 00:00:00 2001 From: ssambasu Date: Wed, 23 Jan 2019 16:48:28 -0800 Subject: [PATCH 2/3] Creating 5.x FlowManager, its implementation and unit test --- .../java/com/marklogic/hub/FlowManager.java | 102 +++--- .../java/com/marklogic/hub/HubConfig.java | 7 + .../java/com/marklogic/hub/HubProject.java | 7 + .../marklogic/hub/impl/FlowManagerImpl.java | 341 ++++++------------ .../com/marklogic/hub/impl/HubConfigImpl.java | 6 +- .../marklogic/hub/impl/HubProjectImpl.java | 7 + .../marklogic/hub/flow/FlowManagerTest.java | 142 ++++++++ .../flow-manager-test/test-flow.flow.json | 14 + 8 files changed, 336 insertions(+), 290 deletions(-) create mode 100644 marklogic-data-hub/src/test/java/com/marklogic/hub/flow/FlowManagerTest.java create mode 100644 marklogic-data-hub/src/test/resources/flow-manager-test/test-flow.flow.json diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java index 23ee70aa4b..7d41091f9f 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java @@ -16,100 +16,84 @@ package com.marklogic.hub; -import com.marklogic.hub.legacy.flow.Flow; -import com.marklogic.hub.legacy.flow.FlowRunner; -import com.marklogic.hub.legacy.flow.FlowType; -import com.marklogic.hub.legacy.flow.impl.FlowImpl; -import org.w3c.dom.Element; +import com.fasterxml.jackson.databind.JsonNode; +import com.marklogic.hub.error.DataHubProjectException; +import com.marklogic.hub.flow.Flow; -import java.nio.file.Path; import java.util.List; /** - * Manages existing flows and creates flow runners to execute flows. + * Manages CRUD operations for flows */ public interface FlowManager { /** - * Turns an XML document into a flow - * @param doc - the xml document representing a flow - * @return a Flow instance + * String value for the flow file extension */ - static Flow flowFromXml(Element doc) { - return FlowImpl.fromXml(doc); - } + String FLOW_FILE_EXTENSION = ".flow.json"; /** - * retrieves a list of all the flows on the local files systems - * @return a list of Flows + * Retrieves a named flow + * @param flowName - string name of the flow + * @return a flow object + * @throws DataHubProjectException if flow is not present or is not a valid flow */ - List getLocalFlows(); + Flow getFlow(String flowName); /** - * retrieves a list of all the flows on the local files systems - * @param entityName - string name of the entity for the flow - * @return a list of Flows + * Returns a flow based on the provided name as JSON string + * @param flowName - name of the flow + * @return string json representation of the flow object + * @throws DataHubProjectException if flow is not present or is not a valid flow */ - List getLocalFlowsForEntity(String entityName); + String getFlowAsJSON(String flowName); /** - * retrieves a list of all the flows on the local files systems - * @param entityName - string name of the entity for the flow - * @param flowType - the FlowType enum, eg: ingest or harmonize - * @return a list of Flows + * Retrieves a list of flows installed on the MarkLogic server + * @return - a list of all flows + * @throws DataHubProjectException if any of the flows is not valid */ - List getLocalFlowsForEntity(String entityName, FlowType flowType); + List getFlows(); /** - * Obtains a flow from a property file - * @param propertiesFile - the Path to the property file - * @return - a flow object + * Retrieves a list of names of flows installed on the MarkLogic server + * @return - a list of names of all flows */ - Flow getFlowFromProperties(Path propertiesFile); + List getFlowNames(); /** - * Retrieves a list of flows installed on the MarkLogic server - * @param entityName - the entity from which to fetch the flows - * @return - a list of flows for the given entity + * Creates a flow + * @param flowName - the name of the flow as a string */ - List getFlows(String entityName); + Flow createFlow(String flowName); /** - * Retrieves a named flow from a given entity - * - * @param entityName - the entity that the flow belongs to - * @param flowName - the name of the flow to get - * @return the flow + * Creates a flow from a given JSON string + * @param json - string representation of flow + * @return - a Flow object + * @throws DataHubProjectException - thrown if flow file cannot be found/read off disk */ - Flow getFlow(String entityName, String flowName); + Flow createFlowFromJSON(String json); /** - * Retrieves a named flow from a given entity - * - * @param entityName - the entity that the flow belongs to - * @param flowName - the name of the flow to get - * @param flowType - the type of flow (ingest/harmonize) - * @return the flow + * Creates a flow from a given JsonNode + * @param json - JsonNode + * @return - a Flow object + * @throws DataHubProjectException - thrown if flow file cannot be found/read off disk */ - Flow getFlow(String entityName, String flowName, FlowType flowType); + Flow createFlowFromJSON(JsonNode json); /** - * Updates the indexes in the database based on the project - * @return - a list of names for all the flows that are legacy + * Deletes a flow + * @param flowName - the name of the flow as a string */ - - List getLegacyFlows(); + void deleteFlow(String flowName); /** - * Sets the version that the legacy flow is to be updated from - * @param fromVersion - string representation of DHF version - * @return a list of updated flow names that were updated + * Saves a flow to disk + * @param flow - the flow object to be saved */ - List updateLegacyFlows(String fromVersion); + void saveFlow(Flow flow) ; + - /** - * Creates and returns a new FlowRunner object using the FlowManager's hubconfig - * @return FlowRunner object with current hubconfig already set - */ - FlowRunner newFlowRunner(); } diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java index 2a241e1a36..649e06aeae 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/HubConfig.java @@ -468,6 +468,13 @@ public interface HubConfig { */ Path getEntityDatabaseDir(); + /** + * Gets the path for the flows directory + * + * @return the path for the flows directory + */ + Path getFlowsDir(); + /** * Returns the current AppConfig object attached to the HubConfig * @return Returns current AppConfig object set for HubConfig diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/HubProject.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/HubProject.java index 6e6585d819..475aac1922 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/HubProject.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/HubProject.java @@ -153,6 +153,13 @@ public interface HubProject { */ Path getEntityDatabaseDir(); + /** + * Gets the path for the flows directory + * + * @return the path for the flows directory + */ + Path getFlowsDir(); + /** * Gets the path for the hub staging modules * diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/FlowManagerImpl.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/FlowManagerImpl.java index 24e681e2f7..202f31c82f 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/FlowManagerImpl.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/FlowManagerImpl.java @@ -13,280 +13,161 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.marklogic.hub.impl; -import com.marklogic.client.DatabaseClient; -import com.marklogic.client.extensions.ResourceManager; -import com.marklogic.client.extensions.ResourceServices.ServiceResult; -import com.marklogic.client.extensions.ResourceServices.ServiceResultIterator; -import com.marklogic.client.io.DOMHandle; -import com.marklogic.client.util.RequestParameters; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.marklogic.hub.FlowManager; import com.marklogic.hub.HubConfig; -import com.marklogic.hub.collector.impl.CollectorImpl; -import com.marklogic.hub.legacy.flow.*; -import com.marklogic.hub.legacy.flow.impl.FlowRunnerImpl; -import com.marklogic.hub.main.impl.MainPluginImpl; -import com.marklogic.hub.scaffold.Scaffolding; +import com.marklogic.hub.error.DataHubProjectException; +import com.marklogic.hub.flow.Flow; +import com.marklogic.hub.flow.FlowImpl; import org.apache.commons.io.FileUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.Properties; -import java.util.regex.Pattern; +import java.util.stream.Collectors; @Component -public class FlowManagerImpl extends ResourceManager implements FlowManager { - - private static final String NAME = "ml:flow"; - - private DatabaseClient stagingClient; - +public class FlowManagerImpl implements FlowManager { @Autowired private HubConfig hubConfig; - @Autowired - private Scaffolding scaffolding; - - public FlowManagerImpl() { - super(); - } - - - public void setupClient() { - this.stagingClient = hubConfig.newStagingClient(); - this.stagingClient.init(NAME, this); - } - - @Override public List getLocalFlows() { - List flows = new ArrayList<>(); - - Path entitiesDir = hubConfig.getHubEntitiesDir(); - File[] entities = entitiesDir.toFile().listFiles((pathname -> pathname.isDirectory())); - if (entities != null) { - for (File entity : entities) { - String entityName = entity.getName(); - flows.addAll(getLocalFlowsForEntity(entityName)); - } + @Override + public Flow getFlow(String flowName) { + Path flowPath = Paths.get(hubConfig.getFlowsDir().toString(), flowName + FLOW_FILE_EXTENSION); + FileInputStream fileInputStream = null; + try { + fileInputStream = FileUtils.openInputStream(flowPath.toFile()); + } catch (IOException e) { + throw new DataHubProjectException("Unable to read flow: " + e.getMessage()); } - return flows; - } - - @Override public List getLocalFlowsForEntity(String entityName) { - return getLocalFlowsForEntity(entityName, null); - } - - @Override public List getLocalFlowsForEntity(String entityName, FlowType flowType) { - - List flows = new ArrayList<>(); - Path entitiesDir = hubConfig.getHubEntitiesDir(); - Path entityDir = entitiesDir.resolve(entityName); - Path inputDir = entityDir.resolve("input"); - Path harmonizeDir = entityDir.resolve("harmonize"); - boolean getInputFlows = false; - boolean getHarmonizeFlows = false; - if (flowType == null) { - getInputFlows = getHarmonizeFlows = true; + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode node; + try { + node = objectMapper.readTree(fileInputStream); + } catch (IOException e) { + throw new DataHubProjectException("Unable to read flow: " + e.getMessage()); } - else if (flowType.equals(FlowType.INPUT)) { - getInputFlows = true; + Flow newFlow = createFlowFromJSON(node); + if(newFlow != null && newFlow.getName().length() > 0){ + return newFlow; } - else if (flowType.equals(FlowType.HARMONIZE)) { - getHarmonizeFlows = true; + else { + throw new DataHubProjectException(flowName +" is not a valid flow"); } - if (getInputFlows) { - File[] inputFlows = inputDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); - if (inputFlows != null) { - for (File inputFlow : inputFlows) { - Flow flow = getLocalFlow(entityName, inputFlow.toPath(), FlowType.INPUT); - if (flow != null) { - flows.add(flow); - } - } - } - } + } - if (getHarmonizeFlows) { - File[] harmonizeFlows = harmonizeDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); - if (harmonizeFlows != null) { - for (File harmonizeFlow : harmonizeFlows) { - Flow flow = getLocalFlow(entityName, harmonizeFlow.toPath(), FlowType.HARMONIZE); - if (flow != null) { - flows.add(flow); - } + @Override + public String getFlowAsJSON(String flowName) { + return getFlow(flowName).serialize(); + } - } - } + @Override + public List getFlows() { + List flowNames = getFlowNames(); + List flows = new ArrayList<>(); + for(String flow : flowNames){ + flows.add(getFlow(flow)); } return flows; } - @Override public Flow getFlowFromProperties(Path propertiesFile) { - String quotedSeparator = Pattern.quote(File.separator); - /* Extract flowName and entityName from ..../plugins/entities// - * input|harmonize//flowName.properties - */ - String floweRegex = ".+" + "plugins" + quotedSeparator + "entities" + quotedSeparator + "(.+)"+ quotedSeparator - +"(input|harmonize)" + quotedSeparator + "(.+)" + quotedSeparator + ".+"; - FlowType flowType = propertiesFile.toString().replaceAll(floweRegex, "$2").equals("input") - ? FlowType.INPUT : FlowType.HARMONIZE; - - String entityName = propertiesFile.toString().replaceAll(floweRegex, "$1"); - return getLocalFlow(entityName, propertiesFile.getParent(), flowType); + @Override + public List getFlowNames() { + // Get all the files with flow.json extension from flows dir + List files = (List) FileUtils.listFiles(hubConfig.getFlowsDir().toFile(), new String[] {"flow.json"} , false ); + List flowNames = files.stream().map(f ->{ + String fileName = f.getName(); + fileName = fileName.replaceAll("(.+)\\.flow\\.json" , "$1"); + return fileName; + }).collect(Collectors.toList()); + + return flowNames; } - private Flow getLocalFlow(String entityName, Path flowDir, FlowType flowType) { - try { - String flowName = flowDir.getFileName().toString(); - File propertiesFile = flowDir.resolve(flowName + ".properties").toFile(); - if (propertiesFile.exists()) { - Properties properties = new Properties(); - FileInputStream fis = new FileInputStream(propertiesFile); - properties.load(fis); - - // trim trailing whitespaces for properties. - for (String key : properties.stringPropertyNames()){ - properties.put(key, properties.get(key).toString().trim()); - } - fis.close(); - - FlowBuilder flowBuilder = FlowBuilder.newFlow() - .withEntityName(entityName) - .withName(flowName) - .withType(flowType) - .withCodeFormat(CodeFormat.getCodeFormat((String) properties.get("codeFormat"))) - .withDataFormat(DataFormat.getDataFormat((String) properties.get("dataFormat"))) - .withMain(new MainPluginImpl((String) properties.get("mainModule"), CodeFormat.getCodeFormat((String) properties.get("mainCodeFormat")))); - - if (flowType.equals(FlowType.HARMONIZE)) { - flowBuilder.withCollector(new CollectorImpl((String) properties.get("collectorModule"), CodeFormat.getCodeFormat((String) properties.get("collectorCodeFormat")))); - } - - return flowBuilder.build(); - } - } - catch(Exception e) { - e.printStackTrace(); - } - return null; + @Override + public Flow createFlow(String flowName) { + Flow flow = new FlowImpl(); + flow.setName(flowName); + return flow; } - @Override public List getFlows(String entityName) { - RequestParameters params = new RequestParameters(); - params.add("entity-name", entityName); - ServiceResultIterator resultItr = this.getServices().get(params); - if (resultItr == null || ! resultItr.hasNext()) { - return null; - } - ServiceResult res = resultItr.next(); - DOMHandle handle = new DOMHandle(); - Document parent = res.getContent(handle).get(); - NodeList children = parent.getDocumentElement().getChildNodes(); - - ArrayList flows = null; - if (children.getLength() > 0) { - flows = new ArrayList<>(); - } - - Node node; - for (int i = 0; i < children.getLength(); i++) { - node = children.item(i); - if (node.getNodeType() == Node.ELEMENT_NODE) { - flows.add(FlowManager.flowFromXml((Element)children.item(i))); - } - } - return flows; + @Override + public Flow createFlowFromJSON(String json) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = null; + try { + node = mapper.readValue(json, JsonNode.class); + } catch (JsonParseException e) { + throw new DataHubProjectException("Unable to parse flow json string : "+ e.getMessage()); + } catch (JsonMappingException e1) { + throw new DataHubProjectException("Unable to parse flow json string : "+ e1.getMessage()); + } catch (IOException e2) { + throw new DataHubProjectException("Unable to parse flow json string : "+ e2.getMessage()); + } + return createFlowFromJSON(node); } - @Override public Flow getFlow(String entityName, String flowName) { - return getFlow(entityName, flowName, null); + @Override + public Flow createFlowFromJSON(JsonNode json) { + Flow flow = new FlowImpl(); + flow.deserialize(json); + return flow; } - @Override public Flow getFlow(String entityName, String flowName, FlowType flowType) { - RequestParameters params = new RequestParameters(); - params.add("entity-name", entityName); - params.add("flow-name", flowName); - if (flowType != null) { - params.add("flow-type", flowType.toString()); + @Override + public void deleteFlow(String flowName) { + File flowFile = Paths.get(hubConfig.getFlowsDir().toString(), flowName + FLOW_FILE_EXTENSION).toFile(); + if (flowFile.exists()) { + try { + FileUtils.forceDelete(flowFile); + } catch (IOException e){ + throw new DataHubProjectException("Could not delete flow "+ flowName); + } } - ServiceResultIterator resultItr = this.getServices().get(params); - if (resultItr == null || ! resultItr.hasNext()) { - return null; + else { + throw new DataHubProjectException("The specified flow doesn't exist."); } - ServiceResult res = resultItr.next(); - DOMHandle handle = new DOMHandle(); - Document parent = res.getContent(handle).get(); - return FlowManager.flowFromXml(parent.getDocumentElement()); - } - @Override public List getLegacyFlows() { - List oldFlows = new ArrayList<>(); - Path entitiesDir = hubConfig.getHubEntitiesDir(); - - File[] entityDirs = entitiesDir.toFile().listFiles(pathname -> pathname.isDirectory()); - if (entityDirs != null) { - for (File entityDir : entityDirs) { - Path inputDir = entityDir.toPath().resolve("input"); - Path harmonizeDir = entityDir.toPath().resolve("harmonize"); + } - File[] inputFlows = inputDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); - addLegacyFlowToList(oldFlows, entityDir, inputFlows); + @Override + public void saveFlow(Flow flow) { + String flowString = flow.serialize(); + String flowFileName = flow.getName() + FLOW_FILE_EXTENSION; + File file = Paths.get(hubConfig.getFlowsDir().toString(), flowFileName).toFile(); + ObjectNode rootNode; + FileOutputStream fileOutputStream = null; + ObjectMapper objectMapper = new ObjectMapper(); - File[] harmonizeFlows = harmonizeDir.toFile().listFiles((pathname) -> pathname.isDirectory() && !pathname.getName().equals("REST")); - addLegacyFlowToList(oldFlows, entityDir, harmonizeFlows); - } + try { + rootNode = (ObjectNode)objectMapper.readTree(flowString); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + fileOutputStream = new FileOutputStream(file); + fileOutputStream.write(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode).getBytes()); + fileOutputStream.flush(); + fileOutputStream.close(); } - - return oldFlows; - } - - private void addLegacyFlowToList(List oldFlows, File entityDir, File[] flows) { - if (flows != null) { - for (File flow : flows) { - File[] mainFiles = flow.listFiles((dir, name) -> name.matches("main\\.(sjs|xqy)")); - File[] flowFiles = flow.listFiles((dir, name) -> name.matches(flow.getName() + "\\.xml")); - if (mainFiles.length < 1 && flowFiles.length == 1) { - oldFlows.add(entityDir.getName() + " => " + flow.getName()); - } else if (mainFiles.length == 1 && mainFiles[0].getName().contains(".sjs")) { - try { - String mainFile = FileUtils.readFileToString(mainFiles[0]); - if (mainFile.contains("dhf.xqy")) { - oldFlows.add(entityDir.getName() + " => " + flow.getName()); - } - } - catch(IOException e) {} - } - } + catch (JsonProcessingException e) { + throw new DataHubProjectException("Could not serialize flow."); } - } - - @Override public List updateLegacyFlows(String fromVersion) { - - List updatedFlows = new ArrayList<>(); - File[] entityDirs = hubConfig.getHubEntitiesDir().toFile().listFiles(pathname -> pathname.isDirectory()); - if (entityDirs != null) { - for (File entityDir : entityDirs) { - updatedFlows.addAll(scaffolding.updateLegacyFlows(fromVersion, entityDir.getName())); - } + catch (IOException e) { + throw new DataHubProjectException("Could not save flow to disk."); } - return updatedFlows; } - - @Override public FlowRunner newFlowRunner() { - return new FlowRunnerImpl(hubConfig); - } - } diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubConfigImpl.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubConfigImpl.java index 8c13220cfd..79752cb99d 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubConfigImpl.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubConfigImpl.java @@ -32,6 +32,7 @@ import com.marklogic.hub.error.DataHubConfigurationException; import com.marklogic.hub.error.DataHubProjectException; import com.marklogic.hub.error.InvalidDBOperationError; +import com.marklogic.hub.legacy.impl.LegacyFlowManagerImpl; import com.marklogic.mgmt.DefaultManageConfigFactory; import com.marklogic.mgmt.ManageClient; import com.marklogic.mgmt.ManageConfig; @@ -77,7 +78,7 @@ public class HubConfigImpl implements HubConfig Properties projectProperties = null; @Autowired - FlowManagerImpl flowManager; + LegacyFlowManagerImpl flowManager; @Autowired DataHubImpl dataHub; @Autowired @@ -1574,6 +1575,9 @@ public DatabaseClient newModulesDbClient() { return hubProject.getEntityDatabaseDir(); } + @Override + public Path getFlowsDir() { return hubProject.getFlowsDir(); } + @JsonIgnore @Override public Path getUserServersDir() { return hubProject.getUserServersDir(); diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubProjectImpl.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubProjectImpl.java index bf7871b595..32438c52e0 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubProjectImpl.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubProjectImpl.java @@ -137,6 +137,10 @@ public void createProject(String projectDirString) { return getEntityConfigDir().resolve("databases"); } + @Override public Path getFlowsDir() { + return this.projectDir.resolve("flows"); + } + @Override public Path getHubStagingModulesDir() { return this.projectDir.resolve(MODULES_DIR); } @@ -250,6 +254,9 @@ public void createProject(String projectDirString) { getHubSchemasDir().toFile().mkdirs(); getUserSchemasDir().toFile().mkdirs(); + //create flow dir + getFlowsDir().toFile().mkdirs(); + //create hub triggers Path hubTriggersDir = getHubTriggersDir(); hubTriggersDir.toFile().mkdirs(); diff --git a/marklogic-data-hub/src/test/java/com/marklogic/hub/flow/FlowManagerTest.java b/marklogic-data-hub/src/test/java/com/marklogic/hub/flow/FlowManagerTest.java new file mode 100644 index 0000000000..4aab9d9db2 --- /dev/null +++ b/marklogic-data-hub/src/test/java/com/marklogic/hub/flow/FlowManagerTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2012-2019 MarkLogic Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.marklogic.hub.flow; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.marklogic.bootstrap.Installer; +import com.marklogic.hub.ApplicationConfig; +import com.marklogic.hub.HubTestBase; +import com.marklogic.hub.error.DataHubProjectException; +import com.marklogic.hub.impl.FlowManagerImpl; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.io.IOException; +import java.util.List; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = ApplicationConfig.class) +class FlowManagerTest extends HubTestBase { + + @Autowired + private FlowManagerImpl fm; + + private String flowString = "{\n" + + " \"name\": \"test-flow\",\n" + + " \"Description\": \"this is an example\",\n" + + " \"Identifier\": \"(some identifier or search)\",\n" + + " \"Steps\": [{\n" + + " \"0\": {\n" + + " \"type\": \"mapping\",\n" + + " \"name\": \"person-mapping1.json\",\n" + + " \"identifier\": \"null\",\n" + + " \"retryLimit\": 0,\n" + + " \"options\": {}\n" + + " }\n" + + " }]\n" + + "}\n"; + + @BeforeEach + void setUp() throws IOException { + basicSetup(); + getHubAdminConfig(); + FileUtils.copyFileToDirectory(getResourceFile("flow-manager-test/test-flow.flow.json"), adminHubConfig.getFlowsDir().toFile()); + } + + @AfterAll + static void runAfterAll() { + new Installer().deleteProjectDir(); + } + + @Test + void getFlow() { + Flow flow = fm.getFlow("test-flow"); + Assertions.assertNotNull(flow); + Assertions.assertEquals("test-flow", flow.getName()); + } + + @Test + void getFlowAsJSON() throws IOException { + String actual = fm.getFlowAsJSON("test-flow"); + assertJsonEqual(flowString, actual, true); + } + + @Test + void getFlows() { + List flows = fm.getFlows(); + Assertions.assertEquals(flows.size() , 1); + Assertions.assertEquals(flows.get(0).getName(), fm.getFlow("test-flow").getName()); + } + + @Test + void getFlowNames() { + List flows = fm.getFlowNames(); + Assertions.assertEquals(flows.size() , 1); + Assertions.assertEquals(flows.get(0) , "test-flow"); + } + + @Test + void createFlow() { + Flow flow = fm.createFlow("test-flow"); + Assertions.assertEquals("test-flow", flow.getName()); + } + + @Test + void createFlowFromJSON() { + fm.deleteFlow("test-flow"); + Flow flow = fm.createFlowFromJSON(flowString); + Assertions.assertEquals("test-flow", flow.getName()); + } + + @Test + void createFlowFromJSON1() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(flowString); + Flow flow = fm.createFlowFromJSON(node); + Assertions.assertEquals("test-flow", flow.getName()); + } + + @Test + void deleteFlow() { + fm.deleteFlow("test-flow"); + + try { + Flow flow = fm.getFlow("test-flow"); + Assertions.assertTrue(false); + } + catch(DataHubProjectException e){ + Assertions.assertTrue(e.getMessage().contains("Unable to read flow:")); + } + + } + + @Test + void saveFlow() throws IOException { + fm.deleteFlow("test-flow"); + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(flowString); + Flow flow = fm.createFlowFromJSON(node); + fm.saveFlow(flow); + Assertions.assertEquals("test-flow", fm.getFlow("test-flow").getName()); + + } +} diff --git a/marklogic-data-hub/src/test/resources/flow-manager-test/test-flow.flow.json b/marklogic-data-hub/src/test/resources/flow-manager-test/test-flow.flow.json new file mode 100644 index 0000000000..6c1e40d9ba --- /dev/null +++ b/marklogic-data-hub/src/test/resources/flow-manager-test/test-flow.flow.json @@ -0,0 +1,14 @@ +{ + "name": "test-flow", + "Description": "this is an example", + "Identifier": "(some identifier or search)", + "Steps": [{ + "0": { + "type": "mapping", + "name": "person-mapping1.json", + "identifier": "null", + "retryLimit": 0, + "options": {} + } + }] +} From 55970be110232b76b6de02bcbff3ac836a13513b Mon Sep 17 00:00:00 2001 From: ssambasu Date: Wed, 23 Jan 2019 17:15:14 -0800 Subject: [PATCH 3/3] Fixing the interface docs --- .../java/com/marklogic/hub/FlowManager.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java b/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java index 7d41091f9f..c5e65295fc 100644 --- a/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java +++ b/marklogic-data-hub/src/main/java/com/marklogic/hub/FlowManager.java @@ -34,9 +34,8 @@ public interface FlowManager { /** * Retrieves a named flow - * @param flowName - string name of the flow + * @param flowName - name of the flow * @return a flow object - * @throws DataHubProjectException if flow is not present or is not a valid flow */ Flow getFlow(String flowName); @@ -44,14 +43,12 @@ public interface FlowManager { * Returns a flow based on the provided name as JSON string * @param flowName - name of the flow * @return string json representation of the flow object - * @throws DataHubProjectException if flow is not present or is not a valid flow */ String getFlowAsJSON(String flowName); /** * Retrieves a list of flows installed on the MarkLogic server * @return - a list of all flows - * @throws DataHubProjectException if any of the flows is not valid */ List getFlows(); @@ -63,29 +60,27 @@ public interface FlowManager { /** * Creates a flow - * @param flowName - the name of the flow as a string + * @param flowName - name of the flow */ Flow createFlow(String flowName); /** * Creates a flow from a given JSON string - * @param json - string representation of flow + * @param json - string representation of the flow * @return - a Flow object - * @throws DataHubProjectException - thrown if flow file cannot be found/read off disk */ Flow createFlowFromJSON(String json); /** * Creates a flow from a given JsonNode - * @param json - JsonNode + * @param json - JsonNode representation of the flow * @return - a Flow object - * @throws DataHubProjectException - thrown if flow file cannot be found/read off disk */ Flow createFlowFromJSON(JsonNode json); /** * Deletes a flow - * @param flowName - the name of the flow as a string + * @param flowName - name of the flow */ void deleteFlow(String flowName); @@ -93,7 +88,7 @@ public interface FlowManager { * Saves a flow to disk * @param flow - the flow object to be saved */ - void saveFlow(Flow flow) ; + void saveFlow(Flow flow); }