Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into patch-1
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Zhu <[email protected]>
  • Loading branch information
peterzhuamazon committed Apr 10, 2023
2 parents 93eaf16 + e3339e8 commit 476daf8
Show file tree
Hide file tree
Showing 98 changed files with 6,881 additions and 63 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `woodstox-core` from 6.3.0 to 6.3.1
- Bump `xmlbeans` from 5.1.0 to 5.1.1 ([#4354](https://github.com/opensearch-project/OpenSearch/pull/4354))
- Bump `azure-storage-common` from 12.18.0 to 12.18.1 ([#4164](https://github.com/opensearch-project/OpenSearch/pull/4664))
- Bump `org.gradle.test-retry` from 1.4.0 to 1.4.1 ([#4411](https://github.com/opensearch-project/OpenSearch/pull/4411))
- Bump `org.gradle.test-retry` from 1.4.0 to 1.5.2
- Bump `reactor-netty-core` from 1.0.19 to 1.0.22 ([#4447](https://github.com/opensearch-project/OpenSearch/pull/4447))
- Bump `reactive-streams` from 1.0.3 to 1.0.4 ([#4488](https://github.com/opensearch-project/OpenSearch/pull/4488))
- Bump `reactor-core` from 3.4.23 to 3.5.1 ([#5604](https://github.com/opensearch-project/OpenSearch/pull/5604))
Expand All @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.2.0
- Bump `net.minidev:json-smart` from 2.4.9 to 2.4.10
- Bump `com.netflix.nebula.ospackage-base` from 11.0.0 to 11.2.0
- Bump `org.apache.hadoop:hadoop-minicluster` from 3.3.4 to 3.3.5

### Changed
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
Expand Down Expand Up @@ -86,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add initial search pipelines ([#6587](https://github.com/opensearch-project/OpenSearch/pull/6587))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- Add wait_for_completion parameter to resize, open, and forcemerge APIs ([#6434](https://github.com/opensearch-project/OpenSearch/pull/6434))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
Expand Down Expand Up @@ -118,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009))
- [Refactor] XContent base classes from xcontent to core library ([#5902](https://github.com/opensearch-project/OpenSearch/pull/5902))
- Changed `opensearch-env` to respect already set `OPENSEARCH_HOME` environment variable ([#6956](https://github.com/opensearch-project/OpenSearch/pull/6956/))
- Added a new field type: flat_object ([#6507](https://github.com/opensearch-project/OpenSearch/pull/6507))

### Deprecated
- Map, List, and Set in org.opensearch.common.collect ([#6609](https://github.com/opensearch-project/OpenSearch/pull/6609))
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ plugins {
id 'opensearch.docker-support'
id 'opensearch.global-build-info'
id "com.diffplug.spotless" version "6.17.0" apply false
id "org.gradle.test-retry" version "1.5.1" apply false
id "org.gradle.test-retry" version "1.5.2" apply false
id "test-report-aggregation"
id 'jacoco-report-aggregation'
}
Expand Down
3 changes: 3 additions & 0 deletions client/rest-high-level/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ testClusters.all {
extraConfigFile nodeCert.name, nodeCert
extraConfigFile nodeTrustStore.name, nodeTrustStore
extraConfigFile pkiTrustCert.name, pkiTrustCert

// Enable APIs behind feature flags
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

thirdPartyAudit.ignoreMissingClasses(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable {
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final SearchPipelineClient searchPipelineClient = new SearchPipelineClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -354,6 +355,10 @@ public final TasksClient tasks() {
return tasksClient;
}

public final SearchPipelineClient searchPipeline() {
return searchPipelineClient;
}

/**
* Executes a bulk request using the Bulk API.
* @param bulkRequest the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.emptySet;

public final class SearchPipelineClient {
private final RestHighLevelClient restHighLevelClient;

SearchPipelineClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse put(PutSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable putAsync(PutSearchPipelineRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetSearchPipelineResponse get(GetSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
Collections.singleton(404)
);
}

/**
* Asynchronously get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getAsync(
GetSearchPipelineRequest request,
RequestOptions options,
ActionListener<GetSearchPipelineResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
listener,
Collections.singleton(404)
);
}

/**
* Delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse delete(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deleteAsync(
DeleteSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;

import java.io.IOException;

final class SearchPipelineRequestConverters {
private SearchPipelineRequestConverters() {}

static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
params.withTimeout(putPipelineRequest.timeout());
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(params.asMap());
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}

static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

public class SearchPipelineClientIT extends OpenSearchRestHighLevelClientTestCase {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

private static void createPipeline(PutSearchPipelineRequest request) throws IOException {
AcknowledgedResponse response = execute(
request,
highLevelClient().searchPipeline()::put,
highLevelClient().searchPipeline()::putAsync
);
assertTrue(response.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id);
GetSearchPipelineResponse response = execute(
getRequest,
highLevelClient().searchPipeline()::get,
highLevelClient().searchPipeline()::getAsync
);
assertTrue(response.isFound());
assertEquals(1, response.pipelines().size());
assertEquals(id, response.pipelines().get(0).getId());
}

public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id);
AcknowledgedResponse response = execute(
deleteRequest,
highLevelClient().searchPipeline()::delete,
highLevelClient().searchPipeline()::deleteAsync
);
assertTrue(response.isAcknowledged());
}

private static XContentBuilder buildSearchPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildSearchPipeline(pipelineBuilder);
}

private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("description", "a pipeline description");
builder.startArray("request_processors");
{
builder.startObject().startObject("filter_query");
{
builder.startObject("query");
{
builder.startObject("term");
{
builder.field("field", "value");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject().endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
}
Loading

0 comments on commit 476daf8

Please sign in to comment.