Skip to content

Commit

Permalink
Add a _freeze / _unfreeze API
Browse files Browse the repository at this point in the history
This commit adds a rest endpoint for freezing and unfreezing an index.
Among other cleanups mainly fixing an issue accessing package private APIs
from a plugin that got caught by integration tests this change also adds
documentation for frozen indices.
Note: frozen indices are marked as `beta` and available as a basic feature.

Relates to elastic#34352
  • Loading branch information
s1monw committed Nov 15, 2018
1 parent 0b7d18d commit 106efb4
Show file tree
Hide file tree
Showing 18 changed files with 699 additions and 113 deletions.
82 changes: 82 additions & 0 deletions docs/reference/frozen-indices.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[role="xpack"]
[testenv="basic"]
[[frozen-indices]]
= Frozen Indices

Elasticsearch indices can require a significant amount of memory available in order to be open and searchable. Yet, not all indices need
to be writable at the same time and have different access patters over time. For example indices in the time-series or logging use-case
are unlikely to be queried once they age out but still need to be kept around for retention policy purposes.

In order to keep indices around for a longer time period but at the same time reducing their hardware requirements they can be transitioned
into a frozen state. Once an index is frozen, all it's transient shard memory aside of mappings and and necessary in-memory structures for
analysis is moved to persistent storage. This allows for a much higher disk to heap storage ratio on individual nodes. Once an index is
frozen their are read-only and require their data-structures required for query execution to be loaded on demand. A search request that hits
one or more shards from frozen indices will execute it's search through a throttling component that ensures that we never search more than
`N` (`1` by default) searches concurrently. This protects nodes from exceeding the available memory due to incoming search requests.

In contrast to ordinary open indices, frozen indices are expected to execute slow and are not designed for high query load. Parallelism is
gained only on a per-node level and loading data-structures on demand is expected to be one or more orders of a magnitude slower than query
execution on a per shard level. Depending on the index and it's data executing a search on a frozen index is expected to be in the seconds
or even minutes range compared to milliseconds on a non-frozen index.

== Best Practices

Since frozen indices provide a much higher disk to heap ratio for the expense of latency it is recommended to allocate frozen indices on
dedicated nodes to prevent searches on frozen indices influencing traffic on low latency nodes. There is a significant overhead in loading
data-structures on demand which can cause page-faults and garbage collections further slowing down query execution.

Since indices that are eligible for freezing are likely to not change in the future disk space can be optimized. Many strategies are
outlined in <<tune-for-disk-usage>>.

== Freezing and unfreezing an index

The freeze and unfreeze index APIs allow to freeze an index, and later on
unfreeze it. A frozen index has almost no overhead on the cluster (except
for maintaining its metadata in memory), and is blocked for write operations.
A frozen index can be unfrozen which will then go through the normal recovery process.

The REST endpoint is `/{index}/_freeze` and `/{index}/_unfreeze`. For
example:

[source,js]
--------------------------------------------------
POST /my_index/_freeze
POST /my_index/_unfreeze
--------------------------------------------------
// CONSOLE
// TEST[s/^/PUT my_index\n/]


[IMPORTANT]
================================
Freezing an index will close the index and reopen it within the same API call. This causes primaries to not be allocated for a short
amount of time and causes the cluster to go red until the primaries are allocated again. This limitation might be removed in the future
================================

== Searching a frozen index

Frozen indices are throttled in order limit memory consumptions per node. The number of concurrently loaded frozen indices per node is
limited by the number of threads in the `search_throttled` <<modules-threadpool,threadpool>> which is `1` by default. At the same time,
search requests hitting may indices on a cluster due to concrete lists, expanded wildcards or a date pattern exclude frozen indices by
default to prevent accidental slowdowns when a frozen index is hit. To include frozen indices a search request must be executed with
`ignore_throttled=false`.

[source,js]
--------------------------------------------------
GET /twitter/_search?q=user:kimchy&ignore_throttled=false
--------------------------------------------------
// CONSOLE
// TEST[setup:twitter]

[IMPORTANT]
================================
While frozen indices are slow to search, they offer an efficient pre-filter phase. The request parameter `pre_filter_shard_size` specifies
a threshold that enforces a round-trip to filter search shards based on query rewriting if the number of shards the search
request expands to exceeds the threshold. This filter phase can limit the number of shards significantly if for instance a shard can not
match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.
The default value for `pre_filter_shard_size` is `128` while for searching frozen indices it's recommended to set it to `1`. There is no
significant overhead associated with this pre-filter phase.
================================


4 changes: 4 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ There are several thread pools, but the important ones include:
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
`1000`.

`search_throttled`::
For count/search/suggest/get operations on `search_throttled indices`. Thread pool type is
`fixed_auto_queue_size` with a size of `1`, and initial queue_size of `100`.

`get`::
For get operations. Thread pool type is `fixed`
with a size of `# of available processors`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdat

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

OpenIndexClusterStateUpdateRequest() {
public OpenIndexClusterStateUpdateRequest() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public class OpenIndexResponse extends ShardsAcknowledgedResponse {
declareAcknowledgedAndShardsAcknowledgedFields(PARSER);
}

OpenIndexResponse() {
public OpenIndexResponse() {
}

OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
public OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
super(acknowledged, shardsAcknowledged);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,46 +100,50 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
Set<IndexMetaData> indicesToClose = new HashSet<>();
for (Index index : request.indices()) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
indicesToClose.add(indexMetaData);
}
}
return closeIndex(currentState, request.indices(), indicesAsString);
}
});
}

if (indicesToClose.isEmpty()) {
return currentState;
}
public ClusterState closeIndex(ClusterState currentState, final Index[] indices, String indicesAsString) {
Set<IndexMetaData> indicesToClose = new HashSet<>();
for (Index index : indices) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
indicesToClose.add(indexMetaData);
}
}

// Check if index closing conflicts with any running restores
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);
if (indicesToClose.isEmpty()) {
return currentState;
}

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
for (IndexMetaData openIndexMetadata : indicesToClose) {
final String indexName = openIndexMetadata.getIndex().getName();
mdBuilder.put(IndexMetaData.builder(openIndexMetadata).state(IndexMetaData.State.CLOSE));
blocksBuilder.addIndexBlock(indexName, INDEX_CLOSED_BLOCK);
}
// Check if index closing conflicts with any running restores
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
for (IndexMetaData openIndexMetadata : indicesToClose) {
final String indexName = openIndexMetadata.getIndex().getName();
mdBuilder.put(IndexMetaData.builder(openIndexMetadata).state(IndexMetaData.State.CLOSE));
blocksBuilder.addIndexBlock(indexName, INDEX_CLOSED_BLOCK);
}

ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();

RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData index : indicesToClose) {
rtBuilder.remove(index.getIndex().getName());
}
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData index : indicesToClose) {
rtBuilder.remove(index.getIndex().getName());
}

//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices closed [" + indicesAsString + "]");
}
});
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices closed [" + indicesAsString + "]");
}

public void openIndex(final OpenIndexClusterStateUpdateRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
Expand Down Expand Up @@ -66,7 +67,7 @@ public class ReadOnlyEngine extends Engine {
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;
protected final RamAccountingSearcherFactory searcherFactory;
private final RamAccountingSearcherFactory searcherFactory;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand Down Expand Up @@ -414,4 +415,8 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}

protected void processReaders(IndexReader reader, IndexReader previousReader) {
searcherFactory.processReaders(reader, previousReader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,26 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener..
* @param listeners new listerns to use for the newly created shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
return reinitShard(current, routing, current.engineFactory, listeners);
}

/**
* Takes an existing shard, closes it and starts a new initialing shard at the same location
*
* @param routing the shard routing to use for the newly created shard.
* @param listeners new listerns to use for the newly created shard
* @param engineFactory the engine factory for the new shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory,
IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(
routing,
current.shardPath(),
current.indexSettings().getIndexMetaData(),
null,
null,
current.engineFactory,
engineFactory,
current.getGlobalCheckpointSyncer(),
EMPTY_EVENT_LISTENER, listeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException {
listeners.beforeRefresh();
}
reader = DirectoryReader.open(engineConfig.getStore().directory());
searcherFactory.processReaders(reader, null);
processReaders(reader, null);
reader = lastOpenedReader = wrapReader(reader, Function.identity());
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package org.elasticsearch.xpack.core;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.license.LicensingClient;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeIndexAction;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeRequest;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction.FreezeResponse;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
Expand All @@ -25,6 +27,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.BASIC_AUTH_HEADER;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
Expand Down Expand Up @@ -106,7 +109,20 @@ public void info(XPackInfoRequest request, ActionListener<XPackInfoResponse> lis
client.execute(XPackInfoAction.INSTANCE, request, listener);
}

public void freeze(TransportFreezeIndexAction.FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
client.execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, request, listener);
/**
* Freezes or unfreeze one or more indices
*/
public void freeze(FreezeRequest request, ActionListener<FreezeResponse> listener) {
client.execute(FreezeIndexAction.INSTANCE, request, listener);
}

/**
* Freeze or unfreeze one or more indices
*/
public FreezeResponse freeze(FreezeRequest request)
throws ExecutionException, InterruptedException {
PlainActionFuture<FreezeResponse> future = new PlainActionFuture<>();
freeze(request, future);
return future.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
Expand Down Expand Up @@ -344,7 +345,9 @@ public List<Action<? extends ActionResponse>> getClientActions() {
ExplainLifecycleAction.INSTANCE,
RemoveIndexLifecyclePolicyAction.INSTANCE,
MoveToStepAction.INSTANCE,
RetryAction.INSTANCE
RetryAction.INSTANCE,
// Frozen indices
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.rest.action.RestFreezeIndexAction;
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
Expand Down Expand Up @@ -297,6 +298,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
List<RestHandler> handlers = new ArrayList<>();
handlers.add(new RestXPackInfoAction(settings, restController));
handlers.add(new RestXPackUsageAction(settings, restController));
handlers.add(new RestFreezeIndexAction(settings, restController));
handlers.addAll(licensing.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter,
indexNameExpressionResolver, nodesInCluster));
return handlers;
Expand Down
Loading

0 comments on commit 106efb4

Please sign in to comment.