Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

fix: Aggregate EIP also contains steps #825

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ For more information and all configuration properties, see [Quarkus HTTP Referen
### Steps catalog resources

**Actual versions of resources:**
* Kaoto camel components: **94ef243f574ad42b85dafe59b3d36858c47fcd38**
* Kaoto camel components: **3ee2af43623923a5c5e09df6f3f70657e1ccd09f**
* Kaoto view definitions: **94aae37dee4356d51ac34bfb757eb43a85ad2c0a**
* Camel-connectors: **3.21.0**
* Camel-connectors: **3.21.0**
Expand Down
Binary file modified api/src/main/resources/camel-component-metadata.zip
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ spec:
simple: "${header.StockSymbol}"
aggregation-strategy: myAggregatorStrategy
completion-size: 2
steps:
- log:
id: log-2fdd
message: "${body}"
- load-balance:
weighted:
distribution-ratio: "2,1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,6 @@ private FlowStep processStep(final Step step, final boolean to) {
flowStep = getCamelConnector(step, to);
} else if ("EIP".equalsIgnoreCase(step.getKind())) {
switch (step.getName()) {
case "aggregate":
flowStep = new AggregateFlowStep(step);
break;
case "bean":
flowStep = new BeanFlowStep(step);
break;
Expand Down Expand Up @@ -624,6 +621,9 @@ private FlowStep processStep(final Step step, final boolean to) {
}
} else if ("EIP-BRANCH".equalsIgnoreCase(step.getKind())) {
switch (step.getName()) {
case "aggregate":
flowStep = new AggregateFlowStep(step, this);
break;
case "circuit-breaker":
flowStep = new CircuitBreakerFlowStep(step, this);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package io.kaoto.backend.camel.model.deployment.kamelet.step;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kaoto.backend.api.metadata.catalog.StepCatalog;
import io.kaoto.backend.camel.KamelHelper;
import io.kaoto.backend.camel.KamelPopulator;
import io.kaoto.backend.camel.model.deployment.kamelet.FlowStep;
import io.kaoto.backend.camel.model.deployment.kamelet.expression.Expression;
import io.kaoto.backend.camel.service.step.parser.kamelet.KameletStepParserService;
import io.kaoto.backend.model.parameter.Parameter;
import io.kaoto.backend.model.step.Step;

import java.util.List;
import java.util.Map;


Expand Down Expand Up @@ -72,97 +78,136 @@ public class Aggregate extends EIPStep {
public static final String COMPLETE_ALL_ON_STOP1 = "complete-all-on-stop";
public static final String DESCRIPTION_LABEL = KamelHelper.DESCRIPTION;

@JsonProperty("correlation-expression")
@JsonProperty(CORRELATION_EXPRESSION)
@JsonAlias(CORRELATION_EXPRESSION1)
private Expression correlationExpression;

@JsonProperty("completion-predicate")
@JsonProperty(COMPLETION_PREDICATE)
@JsonAlias(COMPLETION_PREDICATE1)
private String completionPredicate;

@JsonProperty("completion-timeout-expression")
@JsonProperty(COMPLETION_TIMEOUT_EXPRESSION)
@JsonAlias(COMPLETION_TIMEOUT_EXPRESSION1)
private Expression completionTimeoutExpression;

@JsonProperty("completion-size-expression")
@JsonProperty(COMPLETION_SIZE_EXPRESSION)
@JsonAlias(COMPLETION_SIZE_EXPRESSION1)
private String completionSizeExpression;

@JsonProperty("optimistic-lock-retry-policy")
@JsonProperty(OPTIMISTIC_LOCK_RETRY_POLICY)
@JsonAlias(OPTIMISTIC_LOCK_RETRY_POLICY1)
private String optimisticLockRetryPolicy;

@JsonProperty("parallel-processing")
@JsonProperty(PARALLEL_PROCESSING)
@JsonAlias(PARALLEL_PROCESSING1)
private Boolean parallelProcessing;

@JsonProperty("optimistic-locking")
@JsonProperty(OPTIMISTIC_LOCKING)
@JsonAlias(OPTIMISTIC_LOCKING1)
private String optimisticLocking;

@JsonProperty("executor-service")
@JsonProperty(EXECUTOR_SERVICE)
@JsonAlias(EXECUTOR_SERVICE1)
private String executorService;

@JsonProperty("timeout-checker-executor-service")
@JsonProperty(TIMEOUT_CHECKER_EXECUTOR_SERVICE)
@JsonAlias(TIMEOUT_CHECKER_EXECUTOR_SERVICE1)
private String timeoutCheckerExecutorService;

@JsonProperty("aggregate-controller")
@JsonProperty(AGGREGATE_CONTROLLER)
@JsonAlias(AGGREGATE_CONTROLLER1)
private String aggregateController;

@JsonProperty("aggregation-repository")
@JsonProperty(AGGREGATION_REPOSITORY)
@JsonAlias(AGGREGATION_REPOSITORY1)
private String aggregationRepository;

@JsonProperty("aggregation-strategy")
@JsonProperty(AGGREGATION_STRATEGY)
@JsonAlias(AGGREGATION_STRATEGY1)
private String aggregationStrategy;

@JsonProperty("aggregation-strategy-method-name")
@JsonProperty(AGGREGATION_STRATEGY_METHOD_NAME)
@JsonAlias(AGGREGATION_STRATEGY_METHOD_NAME1)
private String aggregationStrategyMethodName;

@JsonProperty("aggregation-strategy-method-allow-null")
@JsonProperty(AGGREGATION_STRATEGY_METHOD_ALLOW_NULL)
@JsonAlias(AGGREGATION_STRATEGY_METHOD_ALLOW_NULL1)
private Boolean aggregationStrategyMethodAllowNull;

@JsonProperty("completion-size")
@JsonProperty(COMPLETION_SIZE)
@JsonAlias(COMPLETION_SIZE1)
private Integer completionSize;

@JsonProperty("completion-interval")
@JsonProperty(COMPLETION_INTERVAL)
@JsonAlias(COMPLETION_INTERVAL1)
private String completionInterval;

@JsonProperty("completion-timeout")
@JsonProperty(COMPLETION_TIMEOUT)
@JsonAlias(COMPLETION_TIMEOUT1)
private String completionTimeout;

@JsonProperty("completion-timeout-checker-interval")
@JsonProperty(COMPLETION_TIMEOUT_CHECKER_INTERVAL)
@JsonAlias(COMPLETION_TIMEOUT_CHECKER_INTERVAL1)
private String completionTimeoutCheckerInterval;

@JsonProperty("completion-from-batch-consumer")
@JsonProperty(COMPLETION_FROM_BATCH_CONSUMER)
@JsonAlias(COMPLETION_FROM_BATCH_CONSUMER1)
private Boolean completionFromBatchConsumer;

@JsonProperty("completion-on-new-correlation-group")
@JsonProperty(COMPLETION_ON_NEW_CORRELATION_GROUP)
@JsonAlias(COMPLETION_ON_NEW_CORRELATION_GROUP1)
private Boolean completionOnNewCorrelationGroup;

@JsonProperty("eager-check-completion")
@JsonProperty(EAGER_CHECK_COMPLETION)
@JsonAlias(EAGER_CHECK_COMPLETION1)
private Boolean eagerCheckCompletion;

@JsonProperty("ignore-invalid-correlation-keys")
@JsonProperty(IGNORE_INVALID_CORRELATION_KEYS)
@JsonAlias(IGNORE_INVALID_CORRELATION_KEYS1)
private Boolean ignoreInvalidCorrelationKeys;

@JsonProperty("close-correlation-key-on-completion")
@JsonProperty(CLOSE_CORRELATION_KEY_ON_COMPLETION)
@JsonAlias(CLOSE_CORRELATION_KEY_ON_COMPLETION1)
private Integer closeCorrelationKeyOnCompletion;

@JsonProperty("discard-on-completion-timeout")
@JsonProperty(DISCARD_ON_COMPLETION_TIMEOUT)
@JsonAlias(DISCARD_ON_COMPLETION_TIMEOUT1)
private Boolean discardOnCompletionTimeout;

@JsonProperty("discard-on-aggregation-failure")
@JsonProperty(DISCARD_ON_AGGREGATION_FAILURE)
@JsonAlias(DISCARD_ON_AGGREGATION_FAILURE1)
private Boolean discardOnAggregationFailure;

@JsonProperty("force-completion-on-stop")
@JsonProperty(FORCE_COMPLETION_ON_STOP)
@JsonAlias(FORCE_COMPLETION_ON_STOP1)
private Boolean forceCompletionOnStop;

@JsonProperty("complete-all-on-stop")
@JsonProperty(COMPLETE_ALL_ON_STOP)
@JsonAlias(COMPLETE_ALL_ON_STOP1)
private Boolean completeAllOnStop;

@JsonProperty(KamelHelper.DESCRIPTION)
private String description;

@JsonProperty(KamelHelper.STEPS)
private List<FlowStep> steps;

public Aggregate() {
}

public Aggregate(Step step) {
public Aggregate(Step step, final KamelPopulator kameletPopulator) {
super(step);

if (step.getBranches() != null && !step.getBranches().isEmpty()) {
setSteps(kameletPopulator.processSteps(step.getBranches().get(0)));
}
}

@Override
protected void processBranches(final Step step, final StepCatalog catalog,
final KameletStepParserService kameletStepParserService) {
step.setBranches(List.of(createBranch(KamelHelper.STEPS, this.getSteps(), kameletStepParserService)));
}

public Map<String, Object> getRepresenterProperties() {
Map<String, Object> properties = super.getDefaultRepresenterProperties();
Expand Down Expand Up @@ -250,6 +295,9 @@ public Map<String, Object> getRepresenterProperties() {
if (this.description != null) {
properties.put(DESCRIPTION_LABEL, this.description);
}
if (this.getSteps() != null) {
properties.put(KamelHelper.STEPS, this.getSteps());
}

return properties;
}
Expand Down Expand Up @@ -495,6 +543,8 @@ protected void assignProperty(final Parameter parameter) {
}
}



public Expression getCorrelationExpression() {
return correlationExpression;
}
Expand Down Expand Up @@ -718,4 +768,12 @@ public String getDescription() {
public void setDescription(final String description) {
this.description = description;
}

public List<FlowStep> getSteps() {
return steps;
}

public void setSteps(List<FlowStep> steps) {
this.steps = steps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kaoto.backend.api.metadata.catalog.StepCatalog;
import io.kaoto.backend.camel.KamelPopulator;
import io.kaoto.backend.camel.model.deployment.kamelet.FlowStep;
import io.kaoto.backend.camel.service.step.parser.kamelet.KameletStepParserService;
import io.kaoto.backend.model.step.Step;
Expand All @@ -32,8 +33,8 @@ public class AggregateFlowStep implements FlowStep {
public AggregateFlowStep() {
}

public AggregateFlowStep(Step step) {
setAggregate(new Aggregate(step));
public AggregateFlowStep(final Step step, final KamelPopulator kameletPopulator) {
setAggregate(new Aggregate(step, kameletPopulator));
}

@Override
Expand Down
Binary file modified camel-support/src/test/resources/camel-component-metadata.zip
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ spec:
simple: "${header.StockSymbol}"
aggregation-strategy: myAggregatorStrategy
completion-size: 2
steps:
- log:
id: log-2fdd
message: "${body}"
- service-call:
name: sc
static-service-discovery:
Expand Down
Empty file modified update-resources.sh
100644 → 100755
Empty file.