Skip to content

Commit

Permalink
Initialize checkpoint tracker with allocation ID
Browse files Browse the repository at this point in the history
This commit pushes the allocation ID down through to the global
checkpoint tracker at construction rather than when activated as a
primary.

Relates #26630
  • Loading branch information
jasontedor committed Sep 13, 2017
1 parent c2acdcb commit dcb60ab
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 59 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";

protected final ShardId shardId;
protected final String allocationId;
protected final Logger logger;
protected final EngineConfig engineConfig;
protected final Store store;
Expand Down Expand Up @@ -126,6 +127,7 @@ protected Engine(EngineConfig engineConfig) {

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.allocationId = engineConfig.getAllocationId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
*/
public final class EngineConfig {
private final ShardId shardId;
private final String allocationId;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
Expand Down Expand Up @@ -109,7 +110,7 @@ public final class EngineConfig {
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
Expand All @@ -120,6 +121,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
Expand Down Expand Up @@ -240,6 +242,15 @@ public IndexSettings getIndexSettings() {
*/
public ShardId getShardId() { return shardId; }

/**
* Returns the allocation ID for the shard.
*
* @return the allocation ID
*/
public String getAllocationId() {
return allocationId;
}

/**
* Returns the analyzer as the default analyzer in the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
Expand Down Expand Up @@ -283,10 +283,12 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {

private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
shardId,
allocationId,
indexSettings,
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {

private final String allocationId;

/**
* The global checkpoint tracker can operate in two modes:
* - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global
Expand Down Expand Up @@ -245,12 +247,18 @@ private boolean invariant() {
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.allocationId = allocationId;
this.primaryMode = false;
this.handoffInProgress = false;
this.appliedClusterStateVersion = -1L;
Expand Down Expand Up @@ -310,7 +318,7 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp
/**
* Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion.
*/
public synchronized void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
public synchronized void activatePrimaryMode(final long localCheckpoint) {
assert invariant();
assert primaryMode == false;
assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public SequenceNumbersService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint);
}

/**
Expand Down Expand Up @@ -201,7 +202,7 @@ public synchronized long getTrackedLocalCheckpointForShard(final String allocati
* Called on primary activation or promotion.
*/
public void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint);
globalCheckpointTracker.activatePrimaryMode(localCheckpoint);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,7 @@ private DocumentMapperForType docMapper(String type) {

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -198,6 +200,7 @@
public class InternalEngineTests extends ESTestCase {

protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
protected final AllocationId allocationId = AllocationId.newInitializing();
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);

protected ThreadPool threadPool;
Expand Down Expand Up @@ -264,11 +267,11 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
}

public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(),
config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner());
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(),
config.getTranslogRecoveryRunner());
}

@Override
Expand Down Expand Up @@ -447,7 +450,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
indexSettings.getSettings()));
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
Expand Down Expand Up @@ -728,6 +731,7 @@ public void testCommitStats() throws IOException {
Store store = createStore();
InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
Expand Down Expand Up @@ -901,6 +905,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
initialEngine = createEngine(store, createTempDir(), (config) ->
new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -2028,7 +2033,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

try {
initialEngine = engine;
final ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED);
final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId);
final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED);
initialEngine.seqNoService().updateAllocationIdsFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
replica.allocationId().getId())),
Expand Down Expand Up @@ -2788,12 +2793,11 @@ public void testRecoverFromForeignTranslog() throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);

EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null,
config.getTranslogRecoveryRunner());
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(),
null, config.getTranslogRecoveryRunner());

try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
Expand Down Expand Up @@ -3628,6 +3632,7 @@ private SequenceNumbersService getStallingSeqNoService(
final AtomicLong expectedLocalCheckpoint) {
return new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -3839,7 +3844,7 @@ public void testNoOps() throws IOException {
final int globalCheckpoint = randomIntBetween(0, localCheckpoint);
try {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(shardId, defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
@Override
public long generateSeqNo() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -3986,6 +3991,7 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down
Loading

0 comments on commit dcb60ab

Please sign in to comment.