Skip to content

Commit

Permalink
TemplateUpgraders should be called during rolling restart (#25263)
Browse files Browse the repository at this point in the history
In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a rolling cluster restart as well.

Closes #24680
  • Loading branch information
imotov authored Jun 22, 2017
1 parent 8dcb1f5 commit e6e5ae6
Show file tree
Hide file tree
Showing 7 changed files with 925 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
DeleteIndexTemplateResponse() {
}

DeleteIndexTemplateResponse(boolean acknowledged) {
protected DeleteIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,14 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
throws IOException {
builder.startObject(indexTemplateMetaData.name());

toInnerXContent(indexTemplateMetaData, builder, params);

builder.endObject();
}

public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
throws IOException {

builder.field("order", indexTemplateMetaData.order());
if (indexTemplateMetaData.version() != null) {
builder.field("version", indexTemplateMetaData.version());
Expand Down Expand Up @@ -430,8 +438,6 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
}
builder.endObject();

builder.endObject();
}

public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;

import static java.util.Collections.singletonMap;

/**
* Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
*/
public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;

public final ClusterService clusterService;

public final ThreadPool threadPool;

public final Client client;

private final AtomicInteger updatesInProgress = new AtomicInteger();

private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;

public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.indexTemplateMetaDataUpgraders = templates -> {
Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
upgradedTemplates = upgrader.apply(upgradedTemplates);
}
return upgradedTemplates;
};
clusterService.addListener(this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}

if (updatesInProgress.get() > 0) {
// we are already running some updates - skip this cluster state update
return;
}

ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();

if (templates == lastTemplateMetaData) {
// we already checked these sets of templates - no reason to check it again
// we can do identity check here because due to cluster state diffs the actual map will not change
// if there were no changes
return;
}

if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) {
return;
}


lastTemplateMetaData = templates;
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
if (changes.isPresent()) {
if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
}
}
}

/**
* Checks if the current node should update the templates
*
* If the master has the newest verison in the cluster - it will be dedicated template updater.
* Otherwise the node with the highest id among nodes with the highest version should update the templates
*/
boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) {
DiscoveryNode localNode = nodes.getLocalNode();
// Only data and master nodes should update the template
if (localNode.isDataNode() || localNode.isMasterNode()) {
Version maxVersion = nodes.getLargestNonClientNodeVersion();
if (maxVersion.equals(nodes.getMasterNode().getVersion())) {
// If the master has the latest version - we will allow it to handle the update
return nodes.isLocalNodeElectedMaster();
} else {
if (maxVersion.equals(localNode.getVersion()) == false) {
// The localhost node doesn't have the latest version - not going to update
return false;
}
for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) {
if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) {
// We have a node with higher id then mine - it should update
return false;
}
}
// We have the highest version and highest id - we should perform the update
return true;
}
} else {
return false;
}
}

void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
PutIndexTemplateRequest request =
new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
}
}

@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
}
});
}

for (String template : deletions) {
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
@Override
public void onResponse(DeleteIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error deleting template [{}], request was not acknowledged", template);
}
}

@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
if (e instanceof IndexTemplateMissingException == false) {
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
// otherwise we need to warn
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
}
}
});
}
}

int getUpdatesInProgress() {
return updatesInProgress.get();
}

Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
// collect current templates
Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
existingMap.put(customCursor.key, customCursor.value);
}
// upgrade global custom meta data
Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
if (upgradedMap.equals(existingMap) == false) {
Set<String> deletes = new HashSet<>();
Map<String, BytesReference> changes = new HashMap<>();
// remove templates if needed
existingMap.keySet().forEach(s -> {
if (upgradedMap.containsKey(s) == false) {
deletes.add(s);
}
});
upgradedMap.forEach((key, value) -> {
if (value.equals(existingMap.get(key)) == false) {
changes.put(key, toBytesReference(value));
}
});
return Optional.of(new Tuple<>(changes, deletes));
}
return Optional.empty();
}

private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));

private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
try {
return XContentHelper.toXContent((builder, params) -> {
IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
return builder;
}, XContentType.JSON, PARAMS, false);
} catch (IOException ex) {
throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,22 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId;
private final String localNodeId;
private final Version minNonClientNodeVersion;
private final Version maxNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;

private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
Version minNodeVersion) {
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion,
Version maxNodeVersion, Version minNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
this.ingestNodes = ingestNodes;
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion;
this.maxNonClientNodeVersion = maxNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
}
Expand Down Expand Up @@ -234,12 +236,25 @@ public boolean isAllNodes(String... nodesIds) {
/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the oldest version in the cluster
*/
public Version getSmallestNonClientNodeVersion() {
return minNonClientNodeVersion;
}

/**
* Returns the version of the node with the youngest version in the cluster that is not a client node.
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the youngest version in the cluster
*/
public Version getLargestNonClientNodeVersion() {
return maxNonClientNodeVersion;
}

/**
* Returns the version of the node with the oldest version in the cluster.
*
Expand All @@ -252,7 +267,7 @@ public Version getMinNodeVersion() {
/**
* Returns the version of the node with the youngest version in the cluster
*
* @return the oldest version in the cluster
* @return the youngest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
Expand Down Expand Up @@ -654,15 +669,25 @@ public DiscoveryNodes build() {
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT;
// The node where we are building this on might not be a master or a data node, so we cannot assume
// that there is a node with the current version as a part of the cluster.
Version minNonClientNodeVersion = null;
Version maxNonClientNodeVersion = null;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
if (minNonClientNodeVersion == null) {
minNonClientNodeVersion = nodeEntry.value.getVersion();
maxNonClientNodeVersion = nodeEntry.value.getVersion();
} else {
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
}
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
Expand All @@ -673,7 +698,8 @@ public DiscoveryNodes build() {

return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}

Expand Down
Loading

0 comments on commit e6e5ae6

Please sign in to comment.