Skip to content

Commit

Permalink
Merge branch 'master' into feature/seq_no
Browse files Browse the repository at this point in the history
* master:
  ShardActiveResponseHandler shouldn't hold to an entire cluster state
  Ensures cleanup of temporary index-* generational blobs during snapshotting (#21469)
  Remove (again) test uses of onModule (#21414)
  [TEST] Add assertBusy when checking for pending operation counter after tests
  Revert "Add trace logging when aquiring and releasing operation locks for replication requests"
  Allows multiple patterns to be specified for index templates (#21009)
  [TEST] fixes rebalance single shard check as it isn't guaranteed that a rebalance makes sense and the method only tests if rebalance is allowed
  Document _reindex with random_score
  • Loading branch information
jasontedor committed Nov 11, 2016
2 parents d3417fb + 06a50fa commit 1e7c424
Show file tree
Hide file tree
Showing 67 changed files with 823 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("forced_refresh", forcedRefresh);
}
shardInfo.toXContent(builder, params);
//nocommit: i'm not sure we want to expose it in the api but it will be handy for debugging while we work...
// nocommit i'm not sure we want to expose it in the api but it will be handy for debugging while we work... remove this
builder.field("_shard_id", shardId.id());
if (getSeqNo() >= 0) {
builder.field("_seq_no", getSeqNo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
Expand All @@ -32,6 +33,8 @@
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -41,10 +44,13 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
Expand All @@ -56,11 +62,15 @@
*/
public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateRequest> implements IndicesRequest {

private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(PutIndexTemplateRequest.class));

public static final Version V_5_1_0 = Version.fromId(5010099);

private String name;

private String cause = "";

private String template;
private List<String> indexPatterns;

private int order;

Expand Down Expand Up @@ -92,8 +102,8 @@ public ActionRequestValidationException validate() {
if (name == null) {
validationException = addValidationError("name is missing", validationException);
}
if (template == null) {
validationException = addValidationError("template is missing", validationException);
if (indexPatterns == null || indexPatterns.size() == 0) {
validationException = addValidationError("pattern is missing", validationException);
}
return validationException;
}
Expand All @@ -113,13 +123,13 @@ public String name() {
return this.name;
}

public PutIndexTemplateRequest template(String template) {
this.template = template;
public PutIndexTemplateRequest patterns(List<String> indexPatterns) {
this.indexPatterns = indexPatterns;
return this;
}

public String template() {
return this.template;
public List<String> patterns() {
return this.indexPatterns;
}

public PutIndexTemplateRequest order(int order) {
Expand Down Expand Up @@ -286,7 +296,20 @@ public PutIndexTemplateRequest source(Map templateSource) {
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
if (name.equals("template")) {
template(entry.getValue().toString());
// This is needed to allow for bwc (beats, logstash) with pre-5.0 templates (#21009)
if(entry.getValue() instanceof String) {
DEPRECATION_LOGGER.deprecated("Deprecated field [template] used, replaced by [index_patterns]");
patterns(Collections.singletonList((String) entry.getValue()));
}
} else if (name.equals("index_patterns")) {
if(entry.getValue() instanceof String) {
patterns(Collections.singletonList((String) entry.getValue()));
} else if (entry.getValue() instanceof List) {
List<String> elements = ((List<?>) entry.getValue()).stream().map(Object::toString).collect(Collectors.toList());
patterns(elements);
} else {
throw new IllegalArgumentException("Malformed [template] value, should be a string or a list of strings");
}
} else if (name.equals("order")) {
order(XContentMapValues.nodeIntegerValue(entry.getValue(), order()));
} else if ("version".equals(name)) {
Expand All @@ -295,7 +318,7 @@ public PutIndexTemplateRequest source(Map templateSource) {
}
version((Integer)entry.getValue());
} else if (name.equals("settings")) {
if (!(entry.getValue() instanceof Map)) {
if ((entry.getValue() instanceof Map) == false) {
throw new IllegalArgumentException("Malformed [settings] section, should include an inner object");
}
settings((Map<String, Object>) entry.getValue());
Expand Down Expand Up @@ -436,7 +459,7 @@ public PutIndexTemplateRequest alias(Alias alias) {

@Override
public String[] indices() {
return new String[]{template};
return indexPatterns.toArray(new String[indexPatterns.size()]);
}

@Override
Expand All @@ -449,7 +472,12 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
cause = in.readString();
name = in.readString();
template = in.readString();

if (in.getVersion().onOrAfter(V_5_1_0)) {
indexPatterns = in.readList(StreamInput::readString);
} else {
indexPatterns = Collections.singletonList(in.readString());
}
order = in.readInt();
create = in.readBoolean();
settings = readSettingsFromStream(in);
Expand All @@ -475,7 +503,11 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(cause);
out.writeString(name);
out.writeString(template);
if (out.getVersion().onOrAfter(V_5_1_0)) {
out.writeStringList(indexPatterns);
} else {
out.writeString(indexPatterns.size() > 0 ? indexPatterns.get(0) : "");
}
out.writeInt(order);
out.writeBoolean(create);
writeSettingsToStream(settings, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class PutIndexTemplateRequestBuilder
Expand All @@ -39,10 +41,20 @@ public PutIndexTemplateRequestBuilder(ElasticsearchClient client, PutIndexTempla
}

/**
* Sets the template match expression that will be used to match on indices created.
* Sets the match expression that will be used to match on indices created.
*
* @deprecated Replaced by {@link #setPatterns(List)}
*/
@Deprecated
public PutIndexTemplateRequestBuilder setTemplate(String indexPattern) {
return setPatterns(Collections.singletonList(indexPattern));
}

/**
* Sets the match expression that will be used to match on indices created.
*/
public PutIndexTemplateRequestBuilder setTemplate(String template) {
request.template(template);
public PutIndexTemplateRequestBuilder setPatterns(List<String> indexPatterns) {
request.patterns(indexPatterns);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected void masterOperation(final PutIndexTemplateRequest request, final Clus
templateSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
indexScopedSettings.validate(templateSettingsBuilder);
indexTemplateService.putTemplate(new MetaDataIndexTemplateService.PutRequest(cause, request.name())
.template(request.template())
.patterns(request.patterns())
.order(request.order())
.settings(templateSettingsBuilder.build())
.mappings(request.mappings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
} catch (Exception e) {
// nocommit: since we now have RetryOnPrimaryException, retrying doesn't always mean the shard is closed.
// some operations were already perform and have a seqno assigned. we shouldn't just reindex them
// add to the meta-issue
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public class ClusterModule extends AbstractModule {
final Collection<AllocationDecider> allocationDeciders;
final ShardsAllocator shardsAllocator;

// pkg private so tests can mock
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
this.settings = settings;
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand Down Expand Up @@ -159,7 +156,6 @@ private static ShardsAllocator createShardsAllocator(Settings settings, ClusterS

@Override
protected void configure() {
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
bind(GatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
bind(ClusterService.class).toInstance(clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
IndexTemplateMetaData templateMetaData = cursor.value;
builder.startObject(templateMetaData.name());

builder.field("template", templateMetaData.template());
builder.field("index_patterns", templateMetaData.patterns());
builder.field("order", templateMetaData.order());

builder.startObject("settings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@

package org.elasticsearch.cluster;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -39,7 +43,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -50,11 +53,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
* routinely updated on a timer. The timer can be dynamically changed by
Expand Down Expand Up @@ -84,29 +82,24 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
private volatile boolean isMaster = false;
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
private final TransportNodesStatsAction transportNodesStatsAction;
private final TransportIndicesStatsAction transportIndicesStatsAction;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();

@Inject
public InternalClusterInfoService(Settings settings, ClusterSettings clusterSettings,
TransportNodesStatsAction transportNodesStatsAction,
TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
ThreadPool threadPool) {
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings);
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
this.shardRoutingToDataPath = ImmutableOpenMap.of();
this.shardSizes = ImmutableOpenMap.of();
this.transportNodesStatsAction = transportNodesStatsAction;
this.transportIndicesStatsAction = transportIndicesStatsAction;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
Expand Down Expand Up @@ -259,8 +252,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
nodesStatsRequest.clear();
nodesStatsRequest.fs(true);
nodesStatsRequest.timeout(fetchTimeout);

transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
}

Expand All @@ -274,7 +266,7 @@ protected CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsRes
indicesStatsRequest.clear();
indicesStatsRequest.store(true);

transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
client.admin().indices().stats(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
}

Expand Down
Loading

0 comments on commit 1e7c424

Please sign in to comment.