Skip to content

Commit

Permalink
Merge branch 'main' into registerApi
Browse files Browse the repository at this point in the history
  • Loading branch information
dbwiddis authored Aug 13, 2022
2 parents 49ba520 + bdd4641 commit 87b5950
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/sdk/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ActionListener {
* @throws UnknownHostException if the local host name could not be resolved into an address.
*/
@SuppressForbidden(reason = "need local ephemeral port")
protected static InetSocketAddress getLocalEphemeral() throws UnknownHostException {
private static InetSocketAddress createLocalEphemeralAddress() throws UnknownHostException {
return new InetSocketAddress(InetAddress.getLocalHost(), 0);
}

Expand All @@ -50,7 +50,7 @@ public void runActionListener(boolean flag, int timeout) {
// for testing considerations, otherwise zero which is interpreted as an infinite timeout
socket.setSoTimeout(timeout);

socket.bind(getLocalEphemeral(), 1);
socket.bind(createLocalEphemeralAddress(), 1);
socket.setReuseAddress(true);

Thread t = new Thread() {
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.RegisterApiRequest;
Expand Down Expand Up @@ -125,17 +127,17 @@ DiscoveryNode getOpensearchNode() {
}

/**
* Handles a plugin request from OpenSearch. This is the first request for the transport communication as a part of OpenSearch bootstrap.
* It will initialize the opensearchNode and trigger an initial call to sendApiRequest.
* Handles a extension request from OpenSearch. This is the first request for the transport communication and will initialize the extension and will be a part of OpenSearch bootstrap.
*
* @param pluginRequest The request to handle.
* @param extensionInitRequest The request to handle.
* @return A response to OpenSearch validating that this is an extension.
*/
PluginResponse handlePluginsRequest(PluginRequest pluginRequest) {
logger.info("Registering Plugin Request received from OpenSearch");
opensearchNode = pluginRequest.getSourceNode();
InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequest extensionInitRequest) {
logger.info("Registering Extension Request received from OpenSearch");
InitializeExtensionsResponse initializeExtensionsResponse = new InitializeExtensionsResponse(extensionSettings.getExtensionName());
opensearchNode = extensionInitRequest.getSourceNode();
// Fetch the unique ID
for (DiscoveryExtension de : pluginRequest.getExtensions()) {
for (DiscoveryExtension de : extensionInitRequest.getExtensions()) {
if (de.getName().equals(extensionSettings.getExtensionName())) {
setUniqueId(de.getId());
break;
Expand All @@ -149,7 +151,7 @@ PluginResponse handlePluginsRequest(PluginRequest pluginRequest) {
setOpensearchNode(opensearchNode);
extensionTransportService.connectToNode(opensearchNode);
sendRegisterApiRequest(extensionTransportService);
return new PluginResponse(extensionSettings.getExtensionName());
return initializeExtensionsResponse;
}

/**
Expand All @@ -171,7 +173,7 @@ TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exce
}

/**
* Handles a request for extension point indices from OpenSearch. The {@link #handlePluginsRequest(PluginRequest)} method must have been called first to initialize the extension.
* Handles a request for extension point indices from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @param transportService The transport service communicating with OpenSearch.
Expand All @@ -184,7 +186,7 @@ IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesMod
}

/**
* Handles a request for extension name from OpenSearch. The {@link #handlePluginsRequest(PluginRequest)} method must have been called first to initialize the extension.
* Handles a request for extension name from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
Expand Down Expand Up @@ -290,8 +292,8 @@ public void startTransportService(TransportService transportService) {
ThreadPool.Names.GENERIC,
false,
false,
PluginRequest::new,
(request, channel, task) -> channel.sendResponse(handlePluginsRequest(request))
InitializeExtensionsRequest::new,
(request, channel, task) -> channel.sendResponse(handleExtensionInitRequest(request))
);

transportService.registerRequestHandler(
Expand Down
24 changes: 9 additions & 15 deletions src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.opensearch.common.io.stream.NamedWriteableRegistryResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.discovery.PluginRequest;
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.ExtensionsOrchestrator.OpenSearchRequestType;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testRegisterRequestHandler() {
}

@Test
public void testHandlePluginsRequest() throws UnknownHostException {
public void testHandleExtensionInitRequest() throws UnknownHostException {
DiscoveryNode sourceNode = new DiscoveryNode(
"test_node",
new TransportAddress(InetAddress.getByName("localhost"), 9876),
Expand All @@ -112,19 +112,13 @@ public void testHandlePluginsRequest() throws UnknownHostException {
assertEquals("opensearch-sdk-1", extensionsRunner.getUniqueId());
// Test if the source node is set after handlePluginRequest() is called during OpenSearch bootstrap
assertEquals(sourceNode, extensionsRunner.getOpensearchNode());
}

@Test
public void testHandlePluginsRequestInvalidName() throws UnknownHostException {
DiscoveryNode sourceNode = new DiscoveryNode(
"test_node",
new TransportAddress(InetAddress.getByName("localhost"), 9876),
emptyMap(),
emptySet(),
Version.CURRENT
);
PluginRequest pluginRequest = new PluginRequest(sourceNode, Collections.emptyList());
expectThrows(IllegalArgumentException.class, () -> extensionsRunner.handlePluginsRequest(pluginRequest));
InitializeExtensionsRequest extensionInitRequest = new InitializeExtensionsRequest(sourceNode, null);
InitializeExtensionsResponse response = extensionsRunner.handleExtensionInitRequest(extensionInitRequest);
assertEquals(response.getName(), "extension");

// Test if the source node is set after handleExtensionInitRequest()) is called during OpenSearch bootstrap
assertEquals(extensionsRunner.getOpensearchNode(), sourceNode);
}

@Test
Expand Down

0 comments on commit 87b5950

Please sign in to comment.