diff --git a/.gitignore b/.gitignore index bf957ff..02fd5a0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ # Created by .ignore support plugin (hsz.mobi) -.gitignore .idea/ camel-data-provider.iml target/ diff --git a/pom.xml b/pom.xml index 53ed51e..72e5b32 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 org.apache.camel camel-data-provider - 1.0.0 + 1.0.2 jar diff --git a/src/main/java/org/apache/camel/component/dataprovider/DataProviderConstants.java b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConstants.java new file mode 100644 index 0000000..1ee0e8a --- /dev/null +++ b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConstants.java @@ -0,0 +1,29 @@ +package org.apache.camel.component.dataprovider; + +/** + * Some constant around dataprovider component. + * + * @author Christian Ribeaud + */ +public final class DataProviderConstants { + + private DataProviderConstants() { + // Can NOT be instantiated + } + + /** + * Constant to specify whether current {@link org.apache.camel.Exchange} is part of the last batch. + *

+ * Value stored should be a boolean or associated object. + *

+ */ + public final static String LAST_BATCH = DataProviderConstants.class.getName() + ".LastBatch"; + + /** + * Constant to specify whether current {@link org.apache.camel.Exchange} is the last one. + *

+ * Value stored should be a boolean or associated object. + *

+ */ + public final static String LAST_EXCHANGE = DataProviderConstants.class.getName() + ".LastExchange"; +} diff --git a/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java index 1be56f2..0de4dfc 100644 --- a/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java +++ b/src/main/java/org/apache/camel/component/dataprovider/DataProviderConsumer.java @@ -43,15 +43,18 @@ protected void doStart() throws Exception { @Override public int processBatch(Queue exchanges) throws Exception { assert exchanges != null : "Unspecified exchanges"; - final int total = exchanges.size(); - for (int index = 0; index < total && isBatchAllowed(); index++) { + final int batchSize = exchanges.size(); + for (int index = 0; index < batchSize && isBatchAllowed(); index++) { Exchange exchange = (Exchange) exchanges.poll(); // Add current index and total as properties exchange.setProperty(Exchange.BATCH_INDEX, index); - exchange.setProperty(Exchange.BATCH_SIZE, total); - exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); + exchange.setProperty(Exchange.BATCH_SIZE, batchSize); + exchange.setProperty(Exchange.BATCH_COMPLETE, index == batchSize - 1); + // We are handling the last exchange if the last batch is complete + exchange.setProperty(DataProviderConstants.LAST_EXCHANGE, exchange.getProperty(Exchange.BATCH_COMPLETE, Boolean.class) + && exchange.getProperty(DataProviderConstants.LAST_BATCH, Boolean.class)); // Update pending number of exchanges - pendingExchanges = total - index - 1; + pendingExchanges = batchSize - index - 1; // Process the current exchange getProcessor().process(exchange); Exception exception = exchange.getException(); @@ -62,11 +65,12 @@ public int processBatch(Queue exchanges) throws Exception { exception); } } - return total; + return batchSize; } @Override protected int poll() throws Exception { + // Process current range DataProviderEndpoint endpoint = getDataProviderEndoint(); IDataProvider dataProvider = endpoint.getDataProvider(); final Range range = this.rangeReference.get(); @@ -81,9 +85,11 @@ protected int poll() throws Exception { Queue exchanges = new LinkedList<>(); for (Object item : dataProvider.partition(range)) { Exchange exchange = endpoint.createExchange(); + exchange.setProperty(DataProviderConstants.LAST_BATCH, range.upperEndpoint() == size); exchange.getIn().setBody(item); exchanges.add(exchange); } + // Prepare next range Range nextRange = createNextRange(range.upperEndpoint(), size); LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange)); this.rangeReference.set(nextRange); diff --git a/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentTest.java b/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentTest.java index b0fe564..804d64d 100644 --- a/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentTest.java +++ b/src/test/java/org/apache/camel/component/dataprovider/DataProviderComponentTest.java @@ -1,11 +1,17 @@ package org.apache.camel.component.dataprovider; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.testng.CamelTestSupport; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + /** * Test cases for corresponding class {@link DataProviderComponent}. * @@ -16,17 +22,62 @@ public class DataProviderComponentTest extends CamelTestSupport { @Test public void testDataProvider() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMinimumMessageCount(1); - assertMockEndpointsSatisfied(); + // It will wait until it reaches the expected count + mock.expectedMessageCount(100); + mock.setRetainFirst(1); + mock.setRetainLast(1); + mock.assertIsSatisfied(); + List exchanges = mock.getExchanges(); + assertEquals(exchanges.size(), 2); + // Last + Exchange lastExchange = exchanges.get(1); + assertNotNull(lastExchange); + assertEquals(lastExchange.getProperty(DataProviderConstants.LAST_EXCHANGE), true); + assertEquals(lastExchange.getProperty(DataProviderConstants.LAST_BATCH), true); + assertEquals(lastExchange.getProperty(Exchange.BATCH_COMPLETE), true); + assertEquals(lastExchange.getProperty(Exchange.BATCH_SIZE), 20); + assertEquals(lastExchange.getProperty(Exchange.BATCH_INDEX), 19); + // First + Exchange firstExchange = exchanges.get(0); + assertNotNull(firstExchange); + assertEquals(firstExchange.getProperty(DataProviderConstants.LAST_EXCHANGE), false); + assertEquals(firstExchange.getProperty(DataProviderConstants.LAST_BATCH), false); + assertEquals(firstExchange.getProperty(Exchange.BATCH_COMPLETE), false); + assertEquals(firstExchange.getProperty(Exchange.BATCH_SIZE), 20); + assertEquals(firstExchange.getProperty(Exchange.BATCH_INDEX), 0); + } + + @Test + public void testDataProviderNoMoreThan100() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + // It will wait until it reaches the expected count + mock.expectedMessageCount(101); + mock.assertIsNotSatisfied(); } @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); - registry.bind("foo", new StaticDataProvider<>("Hello", "Choubidou")); + List strings = new ArrayList<>(100); + IntStream.range(0, 100).forEach(i -> strings.add(createRandomString())); + registry.bind("foo", new StaticDataProvider<>(strings)); return registry; } + @Override + protected void doPostSetup() throws Exception { + CamelContext context = context(); + DataProviderComponent dataProviderComponent = new DataProviderComponent(context); + String componentName = "dataprovider"; + if (context.hasComponent(componentName) == null) { + context.addComponent(componentName, dataProviderComponent); + } + } + + private static String createRandomString() { + return Long.toHexString(Double.doubleToLongBits(Math.random())); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {