Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global checkpoint to translog checkpoints #21254

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint(),
this::onGlobalCheckpointUpdate
);
seqNoStats.getGlobalCheckpoint());
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -360,19 +358,6 @@ private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

/**
* Sync the translog after the global checkpoint is updated.
*/
void onGlobalCheckpointUpdate() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.sync();
} catch (final IOException e) {
maybeFailEngine("on global checkpoint update", e);
throw new EngineException(shardId, "failed on global checkpoint update", e);
}
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;

public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
Expand Down Expand Up @@ -68,6 +70,7 @@ protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
long checkpoint = indexShard.getGlobalCheckpoint();
syncTranslog(indexShard);
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse());
}

Expand All @@ -76,9 +79,18 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
syncTranslog(indexShard);
return new ReplicaResult();
}

private void syncTranslog(final IndexShard indexShard) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the conversion to an unchecked exceptions? shardOperationOnX Allows throwing them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes This is correct for shardOperationOnPrimary (declares checked Exception) but not shardOperationOnReplica (does not declare any checked exceptions). I did push d742a68 since this was missing from the signature for GlobalCheckpointSyncAction#shardOperationOnPrimary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master both shardOperationOnPrimary and shardOperationOnReplica declare the top-level checked exception. We can pick this up and remove the wrapping after integrating master into feature/seq_no. I pushed 6c1338c.

try {
indexShard.getTranslog().sync();
} catch (final IOException e) {
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e);
}
}

public void updateCheckpointForShard(ShardId shardId) {
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
@Override
Expand Down Expand Up @@ -135,4 +147,5 @@ public long getCheckpoint() {
return checkpoint;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {

final LocalCheckpointService localCheckpointService;
final GlobalCheckpointService globalCheckpointService;
private final Runnable onGlobalCheckpointUpdate;

/**
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG what happened here :)

Expand All @@ -51,19 +50,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param onGlobalCheckpointUpdate invoked when the global checkpoint is updated
*/
public SequenceNumbersService(
final ShardId shardId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint,
final Runnable onGlobalCheckpointUpdate) {
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
this.onGlobalCheckpointUpdate = onGlobalCheckpointUpdate;
}

/**
Expand Down Expand Up @@ -123,12 +119,21 @@ public long getGlobalCheckpoint() {
return globalCheckpointService.getCheckpoint();
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}

/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
public void updateGlobalCheckpointOnReplica(long checkpoint) {
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
onGlobalCheckpointUpdate.run();
}

/**
Expand All @@ -142,17 +147,4 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
final boolean maybeUpdateGlobalCheckpoint = globalCheckpointService.updateCheckpointOnPrimary();
if (maybeUpdateGlobalCheckpoint) {
onGlobalCheckpointUpdate.run();
}
return maybeUpdateGlobalCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public boolean ensureSynced(Location location) throws IOException {

/**
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amout of fsync operations if multiple
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
* locations must be synced.
*
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,7 @@ public SequenceNumbersService seqNoService() {
this.config().getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
globalCheckpoint.get(),
() -> {});
globalCheckpoint.get());
}
};
CommitStats stats1 = engine.commitStats();
Expand Down Expand Up @@ -1712,6 +1711,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint));
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
assertThat(
initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/

package org.elasticsearch.index.seqno;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;

import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class GlobalCheckpointSyncActionTests extends ESTestCase {

private ThreadPool threadPool;
private Transport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;

public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
}

public void tearDown() throws Exception {
try {
IOUtils.close(transportService, clusterService, transport);
} finally {
terminate(threadPool);
}
super.tearDown();
}

public void testTranslogSyncAfterGlobalCheckpointSync() throws IOException {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final Translog translog = mock(Translog.class);
when(indexShard.getTranslog()).thenReturn(translog);

final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY));
final ShardId shardId = new ShardId(index, id);
final GlobalCheckpointSyncAction.PrimaryRequest primaryRequest = new GlobalCheckpointSyncAction.PrimaryRequest(shardId);
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest);
} else {
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.ReplicaRequest(primaryRequest, randomPositiveLong()));
}

verify(translog).sync();
}

}