Skip to content

Commit

Permalink
NIFI-14252 Refactored to use enhanced switch several framework classes (
Browse files Browse the repository at this point in the history
#9703)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
dan-s1 authored Feb 8, 2025
1 parent cc6bd07 commit 6cdc5b6
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1453,16 +1453,11 @@ public TransactionResultEntity commitReceivingFlowFiles(final String transaction
logger.debug("commitReceivingFlowFiles responseCode={}", responseCode);

try (InputStream content = response.getEntity().getContent()) {
switch (responseCode) {
case RESPONSE_CODE_OK:
return readResponse(content);

case RESPONSE_CODE_BAD_REQUEST:
return readResponse(content);

default:
throw handleErrResponse(responseCode, content);
}
return switch (responseCode) {
case RESPONSE_CODE_OK -> readResponse(content);
case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
default -> throw handleErrResponse(responseCode, content);
};
}
}

Expand All @@ -1483,16 +1478,11 @@ public TransactionResultEntity commitTransferFlowFiles(final String transactionU
logger.debug("commitTransferFlowFiles responseCode={}", responseCode);

try (InputStream content = response.getEntity().getContent()) {
switch (responseCode) {
case RESPONSE_CODE_OK:
return readResponse(content);

case RESPONSE_CODE_BAD_REQUEST:
return readResponse(content);

default:
throw handleErrResponse(responseCode, content);
}
return switch (responseCode) {
case RESPONSE_CODE_OK -> readResponse(content);
case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
default -> throw handleErrResponse(responseCode, content);
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,48 +546,21 @@ public String getName() {

@Override
public String getSafeDescription() {
final String componentType;
switch (resourceType) {
case ControllerService:
componentType = "Controller Service";
break;
case ProcessGroup:
componentType = "Process Group";
break;
case Funnel:
componentType = "Funnel";
break;
case InputPort:
componentType = "Input Port";
break;
case OutputPort:
componentType = "Output Port";
break;
case Processor:
componentType = "Processor";
break;
case RemoteProcessGroup:
componentType = "Remote Process Group";
break;
case ReportingTask:
componentType = "Reporting Task";
break;
case FlowAnalysisRule:
componentType = "Flow Analysis Rule";
break;
case Label:
componentType = "Label";
break;
case ParameterContext:
componentType = "Parameter Context";
break;
case ParameterProvider:
componentType = "Parameter Provider";
break;
default:
componentType = "Component";
break;
}
final String componentType = switch (resourceType) {
case ControllerService -> "Controller Service";
case ProcessGroup -> "Process Group";
case Funnel -> "Funnel";
case InputPort -> "Input Port";
case OutputPort -> "Output Port";
case Processor -> "Processor";
case RemoteProcessGroup -> "Remote Process Group";
case ReportingTask -> "Reporting Task";
case FlowAnalysisRule -> "Flow Analysis Rule";
case Label -> "Label";
case ParameterContext -> "Parameter Context";
case ParameterProvider -> "Parameter Provider";
default -> "Component";
};
return componentType + " with ID " + identifier;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,20 +595,11 @@ public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final Disc
logger.info("{} requested disconnection from cluster due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation), false);

final Severity severity;
switch (disconnectionCode) {
case STARTUP_FAILURE:
case MISMATCHED_FLOWS:
case UNKNOWN:
severity = Severity.ERROR;
break;
case LACK_OF_HEARTBEAT:
severity = Severity.WARNING;
break;
default:
severity = Severity.INFO;
break;
}
final Severity severity = switch (disconnectionCode) {
case STARTUP_FAILURE, MISMATCHED_FLOWS, UNKNOWN -> Severity.ERROR;
case LACK_OF_HEARTBEAT -> Severity.WARNING;
default -> Severity.INFO;
};

reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
}
Expand Down Expand Up @@ -1117,17 +1108,17 @@ private void requestReconnectionAsynchronously(final ReconnectionRequestMessage

@Override
public ProtocolMessage handle(final ProtocolMessage protocolMessage, final Set<String> nodeIdentities) throws ProtocolException {
switch (protocolMessage.getType()) {
case CONNECTION_REQUEST:
return handleConnectionRequest((ConnectionRequestMessage) protocolMessage, nodeIdentities);
case NODE_STATUS_CHANGE:
return switch (protocolMessage.getType()) {
case CONNECTION_REQUEST ->
handleConnectionRequest((ConnectionRequestMessage) protocolMessage, nodeIdentities);
case NODE_STATUS_CHANGE -> {
handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage);
return null;
case NODE_CONNECTION_STATUS_REQUEST:
return handleNodeConnectionStatusRequest();
default:
throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
}
yield null;
}
case NODE_CONNECTION_STATUS_REQUEST -> handleNodeConnectionStatusRequest();
default ->
throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
};
}

private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,18 +555,14 @@ public List<ConfigVerificationResult> verifyConfiguration(final ConfigurationCon

@Override
public boolean isValidationNecessary() {
switch (getState()) {
case DISABLED:
case DISABLING:
return true;
case ENABLING:
return switch (getState()) {
case DISABLED, DISABLING -> true;
case ENABLING ->
// If enabling and currently not valid, then we must trigger validation to occur. This allows the #enable method
// to continue running in the background and complete enabling when the service becomes valid.
return getValidationStatus() != ValidationStatus.VALID;
case ENABLED:
default:
return false;
}
getValidationStatus() != ValidationStatus.VALID;
default -> false;
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,15 +620,10 @@ public StatelessGroupScheduledState getStatelessScheduledState() {
}

final ScheduledState currentState = statelessGroupNode.getCurrentState();
switch (currentState) {
case RUNNING:
case RUN_ONCE:
case STARTING:
case STOPPING:
return StatelessGroupScheduledState.RUNNING;
default:
return StatelessGroupScheduledState.STOPPED;
}
return switch (currentState) {
case RUNNING, RUN_ONCE, STARTING, STOPPING -> StatelessGroupScheduledState.RUNNING;
default -> StatelessGroupScheduledState.STOPPED;
};
}

@Override
Expand All @@ -638,13 +633,10 @@ public StatelessGroupScheduledState getDesiredStatelessScheduledState() {
}

final ScheduledState currentState = statelessGroupNode.getDesiredState();
switch (currentState) {
case RUNNING:
case STARTING:
return StatelessGroupScheduledState.RUNNING;
default:
return StatelessGroupScheduledState.STOPPED;
}
return switch (currentState) {
case RUNNING, STARTING -> StatelessGroupScheduledState.RUNNING;
default -> StatelessGroupScheduledState.STOPPED;
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,10 @@ public void stop(final boolean force) {

@Override
public boolean canHandle(final ProtocolMessage msg) {
switch (msg.getType()) {
case RECONNECTION_REQUEST:
case OFFLOAD_REQUEST:
case DISCONNECTION_REQUEST:
case FLOW_REQUEST:
return true;
default:
return false;
}
return switch (msg.getType()) {
case RECONNECTION_REQUEST, OFFLOAD_REQUEST, DISCONNECTION_REQUEST, FLOW_REQUEST -> true;
default -> false;
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,23 +213,13 @@ public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strate
}

private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) {
FlowFilePartitioner partitioner;
switch (strategy) {
case DO_NOT_LOAD_BALANCE:
partitioner = new LocalPartitionPartitioner();
break;
case PARTITION_BY_ATTRIBUTE:
partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
break;
case ROUND_ROBIN:
partitioner = new RoundRobinPartitioner();
break;
case SINGLE_NODE:
partitioner = new FirstNodePartitioner();
break;
default:
throw new IllegalArgumentException();
}
FlowFilePartitioner partitioner = switch (strategy) {
case DO_NOT_LOAD_BALANCE -> new LocalPartitionPartitioner();
case PARTITION_BY_ATTRIBUTE -> new CorrelationAttributePartitioner(partitioningAttribute);
case ROUND_ROBIN -> new RoundRobinPartitioner();
case SINGLE_NODE -> new FirstNodePartitioner();
default -> throw new IllegalArgumentException();
};
return partitioner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4005,40 +4005,20 @@ private boolean authorizeBulletin(final Bulletin bulletin) {

final Authorizable authorizable;
try {
switch (type) {
case PROCESSOR:
authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
break;
case REPORTING_TASK:
authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
break;
case FLOW_ANALYSIS_RULE:
authorizable = authorizableLookup.getFlowAnalysisRule(sourceId).getAuthorizable();
break;
case PARAMETER_PROVIDER:
authorizable = authorizableLookup.getParameterProvider(sourceId).getAuthorizable();
break;
case CONTROLLER_SERVICE:
authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
break;
case FLOW_CONTROLLER:
authorizable = controllerFacade;
break;
case INPUT_PORT:
authorizable = authorizableLookup.getInputPort(sourceId);
break;
case OUTPUT_PORT:
authorizable = authorizableLookup.getOutputPort(sourceId);
break;
case REMOTE_PROCESS_GROUP:
authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
break;
case PROCESS_GROUP:
authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
break;
default:
throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
}
authorizable = switch (type) {
case PROCESSOR -> authorizableLookup.getProcessor(sourceId).getAuthorizable();
case REPORTING_TASK -> authorizableLookup.getReportingTask(sourceId).getAuthorizable();
case FLOW_ANALYSIS_RULE -> authorizableLookup.getFlowAnalysisRule(sourceId).getAuthorizable();
case PARAMETER_PROVIDER -> authorizableLookup.getParameterProvider(sourceId).getAuthorizable();
case CONTROLLER_SERVICE -> authorizableLookup.getControllerService(sourceId).getAuthorizable();
case FLOW_CONTROLLER -> controllerFacade;
case INPUT_PORT -> authorizableLookup.getInputPort(sourceId);
case OUTPUT_PORT -> authorizableLookup.getOutputPort(sourceId);
case REMOTE_PROCESS_GROUP -> authorizableLookup.getRemoteProcessGroup(sourceId);
case PROCESS_GROUP -> authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
default ->
throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
};
} catch (final ResourceNotFoundException e) {
// if the underlying component is gone, disallow
return false;
Expand Down
Loading

0 comments on commit 6cdc5b6

Please sign in to comment.