Skip to content

Commit

Permalink
Sync translog after global checkpoint sync
Browse files Browse the repository at this point in the history
This commit causes the translog to be synced after every global
checkpoint sync and removes syncing of the global checkpoint from the
indexing path.
  • Loading branch information
jasontedor committed Nov 7, 2016
1 parent 19d4db0 commit 86ec4d0
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint(),
this::onGlobalCheckpointUpdate
);
seqNoStats.getGlobalCheckpoint());
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -360,19 +358,6 @@ private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

/**
* Sync the translog after the global checkpoint is updated.
*/
void onGlobalCheckpointUpdate() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.sync();
} catch (final IOException e) {
maybeFailEngine("on global checkpoint update", e);
throw new EngineException(shardId, "failed on global checkpoint update", e);
}
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;

public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
Expand Down Expand Up @@ -68,6 +70,7 @@ protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
long checkpoint = indexShard.getGlobalCheckpoint();
syncTranslog(indexShard);
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse());
}

Expand All @@ -76,9 +79,18 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
syncTranslog(indexShard);
return new ReplicaResult();
}

private void syncTranslog(final IndexShard indexShard) {
try {
indexShard.getTranslog().sync();
} catch (final IOException e) {
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e);
}
}

public void updateCheckpointForShard(ShardId shardId) {
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
@Override
Expand Down Expand Up @@ -135,4 +147,5 @@ public long getCheckpoint() {
return checkpoint;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {

final LocalCheckpointService localCheckpointService;
final GlobalCheckpointService globalCheckpointService;
private final Runnable onGlobalCheckpointUpdate;

/**
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
Expand All @@ -51,19 +50,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param onGlobalCheckpointUpdate invoked when the global checkpoint is updated
*/
public SequenceNumbersService(
final ShardId shardId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint,
final Runnable onGlobalCheckpointUpdate) {
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
this.onGlobalCheckpointUpdate = onGlobalCheckpointUpdate;
}

/**
Expand Down Expand Up @@ -123,12 +119,21 @@ public long getGlobalCheckpoint() {
return globalCheckpointService.getCheckpoint();
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}

/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
public void updateGlobalCheckpointOnReplica(long checkpoint) {
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
onGlobalCheckpointUpdate.run();
}

/**
Expand All @@ -142,17 +147,4 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
}

/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
final boolean maybeUpdateGlobalCheckpoint = globalCheckpointService.updateCheckpointOnPrimary();
if (maybeUpdateGlobalCheckpoint) {
onGlobalCheckpointUpdate.run();
}
return maybeUpdateGlobalCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public boolean ensureSynced(Location location) throws IOException {

/**
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amout of fsync operations if multiple
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
* locations must be synced.
*
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,7 @@ public SequenceNumbersService seqNoService() {
this.config().getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
globalCheckpoint.get(),
() -> {});
globalCheckpoint.get());
}
};
CommitStats stats1 = engine.commitStats();
Expand Down Expand Up @@ -1712,6 +1711,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint));
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
assertThat(
initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/

package org.elasticsearch.index.seqno;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;

import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class GlobalCheckpointSyncActionTests extends ESTestCase {

private ThreadPool threadPool;
private Transport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;

public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
}

public void tearDown() throws Exception {
try {
IOUtils.close(transportService, clusterService, transport);
} finally {
terminate(threadPool);
}
super.tearDown();
}

public void testTranslogSyncAfterGlobalCheckpointSync() throws IOException {
final IndicesService indicesService = mock(IndicesService.class);

final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);

final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);

final Translog translog = mock(Translog.class);
when(indexShard.getTranslog()).thenReturn(translog);

final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY));
final ShardId shardId = new ShardId(index, id);
final GlobalCheckpointSyncAction.PrimaryRequest primaryRequest = new GlobalCheckpointSyncAction.PrimaryRequest(shardId);
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest);
} else {
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.ReplicaRequest(primaryRequest, randomPositiveLong()));
}

verify(translog).sync();
}

}

0 comments on commit 86ec4d0

Please sign in to comment.