Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Extension API to OpenSearch #74

Merged
merged 15 commits into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ Every extension will require metadata stored in an extensions.yml file in order
- Navigate to the extensions folder using `cd extensions`.
- Manually create a file titled `extensions.yml` within the extensions directory using an IDE or an in-line text editor.

Sample extensions.yml file:
Sample `extensions.yml` file (the name must match the `extensionName` field in the corresponding `extension.yml`:

```
extensions:
- name: opensearch-sdk
- name: sample-extension
uniqueId: opensearch-sdk-1
hostName: 'sdk_host'
hostAddress: '127.0.0.1'
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.ArrayList;
import java.util.List;

/**
* This class encapsulates the API of an Extension.
*/
public class ExtensionApi {

private List<String> api = new ArrayList<>();

/**
* Placeholder field. Change the location to extension_api.yml file of the extension.
*/
public static final String EXTENSION_API_DESCRIPTOR = "src/test/resources/extension_api.yml";

/**
* Jackson requires a default constructor.
*/
private ExtensionApi() {
super();
}

public List<String> getApi() {
return new ArrayList<>(api);
}

@Override
public String toString() {
return "ExtensionApi{api=" + api + "}";
}
}
3 changes: 1 addition & 2 deletions src/main/java/org/opensearch/sdk/ExtensionSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public String getHostPort() {

@Override
public String toString() {
return "\nnodename: " + extensionName + "\nhostaddress: " + hostAddress + "\nhostPort: " + hostPort + "\n";
return "ExtensionSettings{extensionName=" + extensionName + ", hostAddress=" + hostAddress + ", hostPort=" + hostPort + "}";
}

}
105 changes: 82 additions & 23 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.PluginRequest;
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.extensions.RegisterApiRequest;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
Expand Down Expand Up @@ -65,7 +67,11 @@
*/
public class ExtensionsRunner {
private ExtensionSettings extensionSettings = readExtensionSettings();
private ExtensionApi extensionApi = readExtensionApi();
private String uniqueId;
private DiscoveryNode opensearchNode;
private TransportService extensionTransportService = null;

private final Settings settings = Settings.builder()
.put("node.name", extensionSettings.getExtensionName())
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
Expand All @@ -83,33 +89,67 @@ public class ExtensionsRunner {
*/
public ExtensionsRunner() throws IOException {}

/**
* Instantiates a new Extensions Runner.
*
* @throws IOException if the runner failed to read settings or API.
*/
public ExtensionsRunner() throws IOException {}

private ExtensionSettings readExtensionSettings() throws IOException {
File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class);
return extensionSettings;
return objectMapper.readValue(file, ExtensionSettings.class);
}

private ExtensionApi readExtensionApi() throws IOException {
File file = new File(ExtensionApi.EXTENSION_API_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
return objectMapper.readValue(file, ExtensionApi.class);
}

private void setUniqueId(String id) {
this.uniqueId = id;
}

String getUniqueId() {
return uniqueId;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public DiscoveryNode getOpensearchNode() {
DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

/**
* Handles a plugin request from OpenSearch. This is the first request for the transport communication and will initialize the extension and will be a part of OpenSearch bootstrap.
* Handles a plugin request from OpenSearch. This is the first request for the transport communication as a part of OpenSearch bootstrap.
* It will initialize the opensearchNode and trigger an initial call to sendApiRequest.
*
* @param pluginRequest The request to handle.
* @return A response to OpenSearch validating that this is an extension.
*/
PluginResponse handlePluginsRequest(PluginRequest pluginRequest) {
logger.info("Registering Plugin Request received from OpenSearch");
PluginResponse pluginResponse = new PluginResponse(extensionSettings.getExtensionName());
opensearchNode = pluginRequest.getSourceNode();
// Fetch the unique ID
for (DiscoveryExtension de : pluginRequest.getExtensions()) {
if (de.getName().equals(extensionSettings.getExtensionName())) {
setUniqueId(de.getId());
break;
}
}
if (getUniqueId() == null) {
throw new IllegalArgumentException(
"Extension " + extensionSettings.getExtensionName() + " does not match any requested extension."
);
}
setOpensearchNode(opensearchNode);
return pluginResponse;
extensionTransportService.connectToNode(opensearchNode);
sendRegisterApiRequest(extensionTransportService);
return new PluginResponse(extensionSettings.getExtensionName());
}

/**
Expand Down Expand Up @@ -140,10 +180,6 @@ TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exce
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);

// handlePluginsRequest will set the opensearchNode while bootstraping of OpenSearch
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);
return indicesModuleResponse;
}

Expand Down Expand Up @@ -200,21 +236,26 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo
}

/**
* Creates a TransportService object. This object will control communication between the extension and OpenSearch.
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @return The configured TransportService object.
* @return The initialized TransportService object.
*/
public TransportService createTransportService(Settings settings) {
public TransportService initializeExtensionTransportService(Settings settings) {

ThreadPool threadPool = new ThreadPool(settings);

Netty4Transport transport = getNetty4Transport(settings, threadPool);

final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);

// Stop any existing transport service
if (extensionTransportService != null) {
extensionTransportService.stop();
}

// create transport service
return new TransportService(
extensionTransportService = new TransportService(
settings,
transport,
threadPool,
Expand All @@ -228,6 +269,8 @@ public TransportService createTransportService(Settings settings) {
emptySet(),
connectionManager
);
startTransportService(extensionTransportService);
return extensionTransportService;
}

/**
Expand Down Expand Up @@ -289,6 +332,26 @@ public void startTransportService(TransportService transportService) {

}

/**
* Requests that OpenSearch register the API for this extension.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendRegisterApiRequest(TransportService transportService) {
logger.info("Sending Register API request to OpenSearch for " + extensionApi.getApi());
RegisterApiResponseHandler registerApiResponseHandler = new RegisterApiResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_API,
new RegisterApiRequest(getUniqueId(), extensionApi.getApi()),
registerApiResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register API request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
Expand Down Expand Up @@ -364,7 +427,7 @@ public void startActionListener(int timeout) {
}

/**
* Run the Extension. Imports settings, establishes a connection with an OpenSearch cluster via a Transport Service, and sets up a listener for responses.
* Run the Extension. Imports settings and sets up Transport Service listening for incoming connections.
*
* @param args Unused
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
Expand All @@ -373,13 +436,9 @@ public static void main(String[] args) throws IOException {

ExtensionsRunner extensionsRunner = new ExtensionsRunner();

// configure and retrieve transport service with settings
Settings settings = extensionsRunner.getSettings();
TransportService transportService = extensionsRunner.createTransportService(settings);

// start transport service and action listener
extensionsRunner.startTransportService(transportService);
// initialize the transport service
extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings());
// start listening on configured port and wait for connection from OpenSearch
extensionsRunner.startActionListener(0);
}

}
46 changes: 46 additions & 0 deletions src/main/java/org/opensearch/sdk/RegisterApiResponseHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.RegisterApiResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendRegisterApiRequest(TransportService)} call.
*/
public class RegisterApiResponseHandler implements TransportResponseHandler<RegisterApiResponse> {
private static final Logger logger = LogManager.getLogger(RegisterApiResponseHandler.class);

@Override
public void handleResponse(RegisterApiResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("RegisterApiRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public RegisterApiResponse read(StreamInput in) throws IOException {
return new RegisterApiResponse(in);
}
}
34 changes: 34 additions & 0 deletions src/test/java/org/opensearch/sdk/TestExtensionApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.opensearch.sdk;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.test.OpenSearchTestCase;

import java.io.File;
import java.util.Arrays;
import java.util.List;

public class TestExtensionApi extends OpenSearchTestCase {

private ExtensionApi extensionApi;

@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();
File file = new File(ExtensionApi.EXTENSION_API_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
extensionApi = objectMapper.readValue(file, ExtensionApi.class);
}

@Test
public void testExtensionApi() {
List<String> apiList = extensionApi.getApi();
List<String> expected = Arrays.asList("GET /api_1", "PUT /api_2", "POST /api_3");
assertEquals(expected.size(), apiList.size());
assertTrue(apiList.containsAll(expected));
assertTrue(expected.containsAll(apiList));
}
}
Loading