Skip to content

Commit

Permalink
Adding support for append only indices (#17039)
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY authored Jan 28, 2025
1 parent e77fc79 commit 5e12737
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add support for append only indices([#17039](https://github.com/opensearch-project/OpenSearch/pull/17039))
- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.core.index;

import org.opensearch.OpenSearchException;

/**
* This exception indicates that retry has been made during indexing for AppendOnly index. If the response of any
* indexing request contains this Exception in the response, we do not need to add a translog entry for this request.
*
* @opensearch.internal
*/
public class AppendOnlyIndexOperationRetryException extends OpenSearchException {
public AppendOnlyIndexOperationRetryException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.bulk;

import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.ingest.IngestTestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;

public class AppendOnlyIndicesIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testIndexDocumentWithACustomDocIdForAppendOnlyIndices() throws Exception {
Client client = internalCluster().coordOnlyNodeClient();
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
)
);
ensureGreen("index");

BulkRequestBuilder bulkBuilder = client.prepareBulk();

XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setId(Integer.toString(0)).setSource(doc));

BulkResponse response = bulkBuilder.get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString(
"Operation [INDEX] is not allowed with a custom document id 0 as setting `"
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
+ "` is enabled for this index: index;"
)
);
}

public void testUpdateDeleteDocumentForAppendOnlyIndices() throws Exception {
Client client = internalCluster().coordOnlyNodeClient();
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
)
);
ensureGreen("index");

BulkRequestBuilder bulkBuilder = client.prepareBulk();

XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setSource(doc));

bulkBuilder.get();
BulkResponse response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString(
"Operation [UPDATE] is not allowed as setting `"
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
+ "` is enabled for this index"
)
);

response = client().prepareBulk().add(client().prepareDelete("index", "0")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString(
"Operation [DELETE] is not allowed as setting `"
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
+ "` is enabled for this index"
)
);
}

public void testRetryForAppendOnlyIndices() throws Exception {
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
int numDocs = scaledRandomIntBetween(100, 1000);
Client client = internalCluster().coordOnlyNodeClient();
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
NodeStats unluckyNode = randomFrom(
nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList())
);
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
)
);
ensureGreen("index");
logger.info("unlucky node: {}", unluckyNode.getNode());
// create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
for (NodeStats dataNode : nodeStats.getNodes()) {
if (exceptionThrown.get()) {
break;
}

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
dataNode.getNode().getName()
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
connection.sendRequest(requestId, action, request, options);
if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
logger.debug("Throw ConnectTransportException");
throw new ConnectTransportException(connection.getNode(), action);
}
}
);
}

BulkRequestBuilder bulkBuilder = client.prepareBulk();

for (int i = 0; i < numDocs; i++) {
XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
}

BulkResponse response = bulkBuilder.get();
for (BulkItemResponse singleIndexResponse : response.getItems()) {
// Retry will not create a new version.
assertThat(singleIndexResponse.getVersion(), equalTo(1L));
}
}

public void testNodeReboot() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
Client client = internalCluster().coordOnlyNodeClient();
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
)
);

ensureGreen("index");

BulkRequestBuilder bulkBuilder = client.prepareBulk();

for (int i = 0; i < numDocs; i++) {
XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
}

BulkResponse response = bulkBuilder.get();
assertFalse(response.hasFailures());
internalCluster().restartRandomDataNode();
ensureGreen("index");
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.setIndices("index")
.setSize(numDocs)
.get();

assertBusy(() -> { assertHitCount(searchResponse, numDocs); }, 20L, TimeUnit.SECONDS);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.TransportWriteAction;
import org.opensearch.core.index.AppendOnlyIndexOperationRetryException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -297,20 +298,36 @@ public void markOperationAsExecuted(Engine.Result result) {
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
if (result.getFailure() instanceof AppendOnlyIndexOperationRetryException) {
Engine.IndexResult indexResult = (Engine.IndexResult) result;
DocWriteResponse indexResponse = new IndexResponse(
primary.shardId(),
requestToExecute.id(),
result.getSeqNo(),
result.getTerm()
)
);
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
);

executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
} else {
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
result.getSeqNo(),
result.getTerm()
)
);
}
break;
default:
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());
Expand Down
Loading

0 comments on commit 5e12737

Please sign in to comment.