Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Extension API to OpenSearch #74

Merged
merged 15 commits into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ Every extension will require metadata stored in an extensions.yml file in order
- Navigate to the extensions folder using `cd extensions`.
- Manually create a file titled `extensions.yml` within the extensions directory using an IDE or an in-line text editor.

Sample extensions.yml file:
Sample `extensions.yml` file (the name must match the `extensionName` field in the corresponding `extension.yml`:

```
extensions:
- name: opensearch-sdk
- name: sample-extension
uniqueId: opensearch-sdk-1
hostName: 'sdk_host'
hostAddress: '127.0.0.1'
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,6 @@ test {
systemProperty 'tests.security.manager', 'false'
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}
62 changes: 62 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionRestPaths.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

/**
* This class encapsulates the API of an Extension.
*/
public class ExtensionRestPaths {

private List<String> restPaths = new ArrayList<>();

/**
* Placeholder field. Eventually will read this from spec file
*/
public static final String EXTENSION_REST_PATHS_DESCRIPTOR = "/extension_rest_paths.yml";

/**
* Jackson requires a default constructor.
*/
private ExtensionRestPaths() {
super();
}

/**
* Gets the REST API Path and Method Strings.
*
* @return a copy of the list containing the REST API Path Strings
*/
public List<String> getRestPaths() {
return new ArrayList<>(restPaths);
}

@Override
public String toString() {
return "ExtensionRestPaths{restPaths=" + restPaths + "}";
}

/**
* Instantiates an instance of this class by reading from a YAML file.
*
* @return An instance of this class.
* @throws IOException if there is an error reading the file.
*/
static ExtensionRestPaths readFromYaml() throws IOException {
File file = new File(ExtensionRestPaths.class.getResource(EXTENSION_REST_PATHS_DESCRIPTOR).getPath());
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
return objectMapper.readValue(file, ExtensionRestPaths.class);
}
}
3 changes: 1 addition & 2 deletions src/main/java/org/opensearch/sdk/ExtensionSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public String getHostPort() {

@Override
public String toString() {
return "\nnodename: " + extensionName + "\nhostaddress: " + hostAddress + "\nhostPort: " + hostPort + "\n";
return "ExtensionSettings{extensionName=" + extensionName + ", hostAddress=" + hostAddress + ", hostPort=" + hostPort + "}";
}

}
104 changes: 80 additions & 24 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.extensions.RegisterRestActionsRequest;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
Expand All @@ -37,14 +39,14 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.RegisterRestActionsResponseHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportResponse;

import java.io.File;
Expand All @@ -65,7 +67,11 @@
*/
public class ExtensionsRunner {
private ExtensionSettings extensionSettings = readExtensionSettings();
private ExtensionRestPaths extensionRestPaths = ExtensionRestPaths.readFromYaml();
private String uniqueId;
private DiscoveryNode opensearchNode;
private TransportService extensionTransportService = null;

private final Settings settings = Settings.builder()
.put("node.name", extensionSettings.getExtensionName())
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
Expand All @@ -79,22 +85,33 @@ public class ExtensionsRunner {
/**
* Instantiates a new Extensions Runner.
*
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
* @throws IOException if the runner failed to read settings or API.
*/
public ExtensionsRunner() throws IOException {}

private ExtensionSettings readExtensionSettings() throws IOException {
File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class);
return extensionSettings;
return objectMapper.readValue(file, ExtensionSettings.class);
}

void setExtensionTransportService(TransportService extensionTransportService) {
this.extensionTransportService = extensionTransportService;
}

private void setUniqueId(String id) {
this.uniqueId = id;
}

String getUniqueId() {
return uniqueId;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public DiscoveryNode getOpensearchNode() {
DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

Expand All @@ -108,8 +125,28 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
logger.info("Registering Extension Request received from OpenSearch");
InitializeExtensionsResponse initializeExtensionsResponse = new InitializeExtensionsResponse(extensionSettings.getExtensionName());
opensearchNode = extensionInitRequest.getSourceNode();
setOpensearchNode(opensearchNode);
return initializeExtensionsResponse;
// Fetch the unique ID
for (DiscoveryExtension de : extensionInitRequest.getExtensions()) {
if (de.getName().equals(extensionSettings.getExtensionName())) {
setUniqueId(de.getId());
break;
}
}
// We could avoid this check if we only send one extension in the initialize request, rather than the entire list
if (getUniqueId() == null) {
throw new IllegalArgumentException(
"Extension " + extensionSettings.getExtensionName() + " does not match any requested extension."
);
}
// Successfully initialized. Send the response.
try {
return initializeExtensionsResponse;
} finally {
// After sending successful response to initialization, send the REST API
setOpensearchNode(opensearchNode);
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
}
}

/**
Expand Down Expand Up @@ -140,10 +177,6 @@ TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exce
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);

// handleExtensionInitRequest will set the opensearchNode while bootstraping of OpenSearch
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);
return indicesModuleResponse;
}

Expand Down Expand Up @@ -200,21 +233,26 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo
}

/**
* Creates a TransportService object. This object will control communication between the extension and OpenSearch.
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @return The configured TransportService object.
* @return The initialized TransportService object.
*/
public TransportService createTransportService(Settings settings) {
public TransportService initializeExtensionTransportService(Settings settings) {

ThreadPool threadPool = new ThreadPool(settings);

Netty4Transport transport = getNetty4Transport(settings, threadPool);

final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);

// Stop any existing transport service
if (extensionTransportService != null) {
extensionTransportService.stop();
}

// create transport service
return new TransportService(
extensionTransportService = new TransportService(
settings,
transport,
threadPool,
Expand All @@ -228,6 +266,8 @@ public TransportService createTransportService(Settings settings) {
emptySet(),
connectionManager
);
startTransportService(extensionTransportService);
return extensionTransportService;
}

/**
Expand Down Expand Up @@ -289,6 +329,26 @@ public void startTransportService(TransportService transportService) {

}

/**
* Requests that OpenSearch register the REST Actions for this extension.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendRegisterRestActionsRequest(TransportService transportService) {
logger.info("Sending Register REST Actions request to OpenSearch for " + extensionRestPaths.getRestPaths());
RegisterRestActionsResponseHandler registerActionsResponseHandler = new RegisterRestActionsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
new RegisterRestActionsRequest(getUniqueId(), extensionRestPaths.getRestPaths()),
registerActionsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register REST Actions request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
Expand Down Expand Up @@ -364,7 +424,7 @@ public void startActionListener(int timeout) {
}

/**
* Run the Extension. Imports settings, establishes a connection with an OpenSearch cluster via a Transport Service, and sets up a listener for responses.
* Run the Extension. Imports settings and sets up Transport Service listening for incoming connections.
*
* @param args Unused
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
Expand All @@ -373,13 +433,9 @@ public static void main(String[] args) throws IOException {

ExtensionsRunner extensionsRunner = new ExtensionsRunner();

// configure and retrieve transport service with settings
Settings settings = extensionsRunner.getSettings();
TransportService transportService = extensionsRunner.createTransportService(settings);

// start transport service and action listener
extensionsRunner.startTransportService(transportService);
// initialize the transport service
extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings());
// start listening on configured port and wait for connection from OpenSearch
extensionsRunner.startActionListener(0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.RegisterRestActionsResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendRegisterRestActionsRequest(TransportService)} call.
*/
public class RegisterRestActionsResponseHandler implements TransportResponseHandler<RegisterRestActionsResponse> {
private static final Logger logger = LogManager.getLogger(RegisterRestActionsResponseHandler.class);

@Override
public void handleResponse(RegisterRestActionsResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("RegisterActionsRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public RegisterRestActionsResponse read(StreamInput in) throws IOException {
return new RegisterRestActionsResponse(in);
}
}
9 changes: 9 additions & 0 deletions src/main/resources/extension_rest_paths.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is a placeholder for a more formal API definition file, e.g., an OpenAPI
# spec file which can be used to generate documentation and streamline testing
# See https://github.com/opensearch-project/OpenSearch/issues/3090
# The API will include at least two components, a method (e.g., GET, PUT) and
# parts of a URI (e.g., /_extensions/_sample-extension/api_1)
restPaths:
- GET /api_1
- PUT /api_2
- POST /api_3
Loading