Skip to content

Commit

Permalink
NIFI-14132 Flow Analysis Rule to check flowfile expiration configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <[email protected]>

This closes #9664.
  • Loading branch information
mosermw authored and pvillard31 committed Feb 1, 2025
1 parent 4224f03 commit 2ca59d7
Show file tree
Hide file tree
Showing 12 changed files with 555 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
<excludes combine.children="append">
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json</exclude>
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json</exclude>
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json</exclude>
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

Expand All @@ -44,17 +42,13 @@ public class DisallowComponentType extends AbstractFlowAnalysisRule {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

private final static List<PropertyDescriptor> propertyDescriptors;

static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(COMPONENT_TYPE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
}
private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
COMPONENT_TYPE
);

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
return PROPERTY_DESCRIPTORS;
}

@Override
Expand All @@ -63,8 +57,7 @@ public Collection<ComponentAnalysisResult> analyzeComponent(VersionedComponent c

String componentType = context.getProperty(COMPONENT_TYPE).getValue();

if (component instanceof VersionedExtensionComponent) {
VersionedExtensionComponent versionedExtensionComponent = (VersionedExtensionComponent) component;
if (component instanceof VersionedExtensionComponent versionedExtensionComponent) {

String encounteredComponentType = versionedExtensionComponent.getType();
String encounteredSimpleComponentType = encounteredComponentType.substring(encounteredComponentType.lastIndexOf(".") + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,19 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.flowanalysis.GroupAnalysisResult;
import org.apache.nifi.flowanalysis.rules.util.ConnectionViolation;
import org.apache.nifi.flowanalysis.rules.util.FlowAnalysisRuleUtils;
import org.apache.nifi.flowanalysis.rules.util.ViolationType;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"connection", "backpressure"})
@CapabilityDescription("This rule will generate a violation if backpressure settings of a connection exceed configured thresholds. "
Expand Down Expand Up @@ -125,117 +120,81 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
public Collection<GroupAnalysisResult> analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
final Collection<GroupAnalysisResult> results = new HashSet<>();
final Collection<ConnectionViolation> violations = new ArrayList<>();

final long minCount = context.getProperty(COUNT_MIN).asLong();
final long maxCount = context.getProperty(COUNT_MAX).asLong();
final double minSize = context.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
final double maxSize = context.getProperty(SIZE_MAX).asDataSize(DataUnit.B);

// Map of all id/components to generate more human readable violations
final Map<String, VersionedComponent> idComponent = Stream.of(
pg.getFunnels().stream(),
pg.getProcessors().stream(),
pg.getInputPorts().stream(),
pg.getOutputPorts().stream())
.flatMap(c -> c)
.collect(Collectors.toMap(c -> c.getIdentifier(), Function.identity()));

pg.getConnections().stream().forEach(
pg.getConnections().forEach(
connection -> {
if (connection.getBackPressureObjectThreshold() < minCount) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT, connection.getBackPressureObjectThreshold().toString(), Long.toString(minCount))));
this.getClass().getSimpleName(),
connection.getBackPressureObjectThreshold().toString(),
context.getProperty(COUNT_MIN).getValue()));
}
if (connection.getBackPressureObjectThreshold() > maxCount) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT, connection.getBackPressureObjectThreshold().toString(), Long.toString(maxCount))));
this.getClass().getSimpleName(),
connection.getBackPressureObjectThreshold().toString(),
context.getProperty(COUNT_MAX).getValue()));
}
final double sizeThreshold = DataUnit.parseDataSize(connection.getBackPressureDataSizeThreshold(), DataUnit.B);
if (Double.compare(sizeThreshold, minSize) < 0) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT, connection.getBackPressureDataSizeThreshold(), context.getProperty(SIZE_MIN).getValue())));
this.getClass().getSimpleName(),
connection.getBackPressureDataSizeThreshold(),
context.getProperty(SIZE_MIN).getValue()));
}
if (Double.compare(sizeThreshold, maxSize) > 0) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT, connection.getBackPressureDataSizeThreshold(), context.getProperty(SIZE_MAX).getValue())));
this.getClass().getSimpleName(),
connection.getBackPressureDataSizeThreshold(),
context.getProperty(SIZE_MAX).getValue()));
}
}
);

return results;
}

private GroupAnalysisResult buildViolation(final VersionedConnection connection, final VersionedComponent source,
final VersionedComponent destination, final BackpressureViolationType backpressureViolationType, final String violationMessage) {
// The reason why we want the violation to be on the processor when we have one
// (either as source or destination) is because in case the rule is "enforced"
// we want the corresponding component to be invalid. If this is not a processor
// (funnel, process group, etc) we cannot make it invalid so we put the
// violation on the connection itself.
if (!(source instanceof VersionedProcessor) && !(destination instanceof VersionedProcessor)) {
// connection between two components that are not processors and cannot be invalid, setting violation on connection
return GroupAnalysisResult.forComponent(connection,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
} else if (source instanceof VersionedProcessor) {
// defining violation on source processor
return GroupAnalysisResult.forComponent(source,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
} else {
// defining violation on destination processor
return GroupAnalysisResult.forComponent(destination,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
}
}

private String getLocationMessage(final VersionedConnection connection, final VersionedComponent source, final VersionedComponent destination) {
if (source == null || destination == null) {
return "The connection [" + connection.getIdentifier() + "] is violating the rule for backpressure settings. ";
}
return "The connection [" + connection.getIdentifier() + "] connecting " + source.getName() + " [" + source.getIdentifier() + "] to "
+ destination.getName() + " [" + destination.getIdentifier() + "] is violating the rule for backpressure settings. ";
}

private String getViolationMessage(final BackpressureViolationType backpressureViolationType, final String configured, final String limit) {
return switch (backpressureViolationType) {
case BP_COUNT_THRESHOLD_ABOVE_LIMIT -> "The connection is configured with a Backpressure Count Threshold of " + configured + " and it should be lesser or equal than " + limit + ".";
case BP_COUNT_THRESHOLD_BELOW_LIMIT -> "The connection is configured with a Backpressure Count Threshold of " + configured + " and it should be greater or equal than " + limit + ".";
case BP_SIZE_THRESHOLD_ABOVE_LIMIT -> "The connection is configured with a Backpressure Data Size Threshold of " + configured + " and it should be lesser or equal than " + limit + ".";
case BP_SIZE_THRESHOLD_BELOW_LIMIT -> "The connection is configured with a Backpressure Data Size Threshold of " + configured + " and it should be greater or equal than " + limit + ".";
};
return FlowAnalysisRuleUtils.convertToGroupAnalysisResults(pg, violations);
}

private enum BackpressureViolationType {
private enum BackpressureViolationType implements ViolationType {

BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow"),
BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh"),
BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow"),
BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh");
BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow", "Back Pressure Count Threshold", "cannot be less than"),
BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh", "Back Pressure Count Threshold", "cannot be greater than"),
BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow", "Back Pressure Data Size Threshold", "cannot be less than"),
BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh", "Back Pressure Data Size Threshold", "cannot be greater than");

private final String id;
private final String configurationItem;
private final String violationMessage;

BackpressureViolationType(String id) {
BackpressureViolationType(String id, String configurationItem, String violationMessage) {
this.id = id;
this.configurationItem = configurationItem;
this.violationMessage = violationMessage;
}

@Override
public String getId() {
return this.id;
}

@Override
public String getConfigurationItem() {
return this.configurationItem;
}

@Override
public String getViolationMessage() {
return this.violationMessage;
}

}
}
Loading

0 comments on commit 2ca59d7

Please sign in to comment.