-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce primary context #25122
Introduce primary context #25122
Conversation
@bleskes I'm opening this for discussion on coming up with an effective strategy for testing. |
The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff.
989a7aa
to
d687f51
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial pass and left some comments, until we discuss the testing aspect.
* | ||
* @param seqNoPrimaryContext the sequence number context | ||
*/ | ||
synchronized void updateAllocationIdsFromPrimaryContext(final SeqNoPrimaryContext seqNoPrimaryContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is tricky. The master is the one that drives the set of shards that are allocated. If the copy was removed by the master we shouldn't re-add it because of a primary handoff that happens concurrently. I think we should make the primary context be a recovery level thing that uses existing shard API (updateLocalCheckPoint/markAllocationIdAsInSync)
|
||
import java.io.IOException; | ||
|
||
public class SeqNoPrimaryContext implements Writeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java docs pls?
*/ | ||
public PrimaryContext primaryContext() { | ||
verifyPrimary(); | ||
assert shardRouting.relocating(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the shardRouting as a message?
public PrimaryContext primaryContext() { | ||
verifyPrimary(); | ||
assert shardRouting.relocating(); | ||
assert !shardRouting.isRelocationTarget(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a message
* master: (53 commits) Log checkout so SHA is known Add link to community Rust Client (elastic#22897) "shard started" should show index and shard ID (elastic#25157) await fix testWithRandomException Change BWC versions on create index response Return the index name on a create index response Remove incorrect bwc branch logic from master Correctly format arrays in output [Test] Extending parsing checks for SearchResponse (elastic#25148) Scripting: Change keys for inline/stored scripts to source/id (elastic#25127) [Test] Add test for custom requests in High Level Rest Client (elastic#25106) nested: In case of a single type the _id field should be added to the nested document instead of _uid field. `type` and `id` are lost upon serialization of `Translog.Delete`. (elastic#24586) fix highlighting docs Fix NPE in token_count datatype with null value (elastic#25046) Remove the postings highlighter and make unified the default highlighter choice (elastic#25028) [Test] Adding test for parsing SearchShardFailure leniently (elastic#25144) Fix typo in shards.asciidoc (elastic#25143) List Hibernate Search (elastic#25145) [DOCS] update maxRetryTimeout in java REST client usage page ...
* master: Explicitly reject duplicate data paths Do not swallow node lock failed exception Revert "Revert "Sense for VirtualBox and $HOME when deciding to turn on vagrant testing. (elastic#24636)"" Aggregations bug: Significant_text fails on arrays of text. (elastic#25030) Speed up sorted scroll when the index sort matches the search sort (elastic#25138) TranslogTests.testWithRandomException ignored a possible simulated OOM when trimming files Adapt TranslogTests.testWithRandomException to checkpoint syncing on trim Change BWC versions on get mapping 404s Fix get mappings HEAD requests TranslogTests#commit didn't allow for a concurrent closing of a view Fix handling of exceptions thrown on HEAD requests Fix comment formatting in EvilLoggerTests Remove unneeded weak reference from prefix logger Test: remove faling test that relies on merge order
Back to you @bleskes. |
8f0b01c
to
8504b82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jasontedor. I left a bunch of comments.
@@ -23,10 +23,12 @@ | |||
import com.carrotsearch.hppc.ObjectLongMap; | |||
import com.carrotsearch.hppc.cursors.ObjectLongCursor; | |||
import org.elasticsearch.common.SuppressForbidden; | |||
import org.elasticsearch.common.collect.HppcMaps; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.shard.AbstractIndexShardComponent; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
||
import java.util.Arrays; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As does this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
*/ | ||
for (final ObjectLongCursor<String> cursor : seqNoPrimaryContext.inSyncLocalCheckpoints()) { | ||
updateLocalCheckpoint(cursor.key, cursor.value); | ||
assert cursor.value >= globalCheckpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also assert that the key is not found in the in sync map? in fact, the sync map should be empty no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this more, why would the in-sync map be empty? The target could have applied a cluster state containing the source as an active allocation ID? I don't think we can make any assertion here at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. What I was thinking of that any "promotion" to in sync should go through the primary but this currently not the case. I do think that this is a better & simpler model that we can switch to as a follow up (the current model is based on the think we had back when we built around a background sync and didn't the locked down clean hand offs we have now). For now, I think we can assert that all the values in the in sync map are unknown. Not sure how much it's worth though. Up to you.
assert cursor.value >= globalCheckpoint | ||
: "local checkpoint [" + cursor.value + "] violates being at least the global checkpoint [" + globalCheckpoint + "]"; | ||
try { | ||
markAllocationIdAsInSync(cursor.key, cursor.value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works for started shards because we have a check in markAllocationIdAsInSync
that ignores this call if it can't find the aId in the tracking map.
Given that the insync map is empty, I wonder if this will be simpler to work with if change seqNoPrimaryContext to be a map of aid->local checkpoint + a set of in sync aids. We will then loop on the map and call updateLocalCheckpoint for everything in it. Then we can do manual promotion, instead of calling markAllocationIdAsInSync
and suffer everything it brings with it. Concretely we'll just do:
if (trackingLocalCheckpoints.containsKey(allocationId)) {
long current = trackingLocalCheckpoints.remove(allocationId);
inSyncLocalCheckpoints.put(allocationId, current);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer the sequence number related state of the primary context look like the state of the global checkpoint tracker. It's easier to think about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer the sequence number related state of the primary context look like the state of the global checkpoint tracker
Ok. I would still prefer not using markAllocationIdAsInSync as is and all the complexity it brings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
* @return the sequence number primary context | ||
*/ | ||
public SeqNoPrimaryContext seqNoPrimaryContext() { | ||
synchronized (globalCheckpointTracker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should push this down to the tracker. Then construction and application are the same. Also the external lock is ugly :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I figured you'd say that.
try { | ||
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); | ||
/* | ||
* We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain we this is needed? so we don't have the background sync? what happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is only moved, I'm not addressing it in this PR.
/** | ||
* Represents the primary context which encapsulates the view the primary shard has of its replication group. | ||
*/ | ||
public class PrimaryContext implements Writeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this for now? I think it's a left over from my previous, cancelled, ask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is intentional and was always this way even before your cancelled ask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain? It is an empty abstraction at the moment. We can always add it when we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it in 4ba8d5c.
public class RecoveryHandoffPrimaryContextRequest extends TransportRequest { | ||
|
||
private long recoveryId; | ||
private ShardId shardId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a toString? I don't see any other usage for this shardId field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
* Initialize an empty request (used to serialize into when reading from a stream). | ||
*/ | ||
@SuppressWarnings("WeakerAccess") | ||
public RecoveryHandoffPrimaryContextRequest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be package private and then the SupressWarnings makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's weird, there's no other reason to add the suppression unless I had already made it package private at some point. I'm not sure why I would have changed it to public. I will change it back.
@@ -448,7 +451,23 @@ public void finalizeRecovery(final long targetLocalCheckpoint) { | |||
StopWatch stopWatch = new StopWatch().start(); | |||
logger.trace("finalizing recovery"); | |||
cancellableThreads.execute(() -> { | |||
shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); | |||
final CountDownLatch latch = new CountDownLatch(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
* master: testCreateShrinkIndex: removed left over debugging log line that violated linting testCreateShrinkIndex should make sure to use the right source stats when testing shrunk target [Test] Add unit test for XContentParserUtilsTests.parseStoredFieldsValue (elastic#25288) Update percolate-query.asciidoc (elastic#25364) Remove remaining `index.mapping.single_type=false` (elastic#25369) test: Replace OldIndexBackwardsCompatibilityIT#testOldClusterStates with a full cluster restart qa test Fix use of spaces on Windows if JAVA_HOME not set ESIndexLevelReplicationTestCase.ReplicationAction#execute should send exceptions to it's listener rather than bubble them up testRecoveryAfterPrimaryPromotion shouldn't flush the replica with extra operations fix sort and string processor tests around targetField (elastic#25358) Ensure `InternalEngineTests.testConcurrentWritesAndCommits` doesn't pile up commits (elastic#25367) [TEST] Add debug logging if an unexpected exception is thrown Update Painless to Allow Augmentation from Any Class (elastic#25360) TemplateUpgraders should be called during rolling restart (elastic#25263)
@ywelsch Back to you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left one more question
import org.elasticsearch.index.shard.ShardId; | ||
|
||
import java.security.Security; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import gone wrong :)
sealed = false; | ||
throw e; | ||
} | ||
return () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused why we need both a consumer and a releasable here. Maybe it's good enough to have a "releaseFailedPrimaryContext" method that sets sealed back to false.
I would like the sealing to remain active if the relocation successfully completed (i.e. the source shard has been successfully marked as "relocating"). This validates that noone is advancing global checkpoint on a primary that is no longer active primary. Similarly (not for this PR, but in a follow-up) I would like the GlobalCheckPointTracker to be initially sealed and only be unsealed when a primary shard gets activated (whether by becoming active after recovery or being initialized with a primary context during relocation handoff).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it was your suggestion to return a Releasable
. Yet, I will change this. 😇
final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints); | ||
final ObjectLongMap<String> trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints); | ||
try { | ||
consumer.accept(new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why run the handoff under lock? This will block cluster state updates calling updateAllocationIdsFromMaster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was only an inadvertent side-effect of turning the context inside out to return a Releasable
but it definitely has to go!
final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { | ||
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { | ||
if (sealed) { | ||
throw new IllegalStateException("global checkpoint tracker is sealed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cluster state updates can still happen during a relocation handoff (or after), so I don't think that we should check for sealing here. In fact, as the handoff can fail and the primary continue operating as usual, we also want to take all cluster state updates that happened during handoff into consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a change that addresses this.
* master: Move more token filters to analysis-common module Test: Allow merging mock secure settings (elastic#25387) Remove remaining `index.mapper.single_type` setting usage from tests (elastic#25388) Remove dead logger prefix code Tests: Improve stability and logging of TemplateUpgradeServiceIT tests (elastic#25386) Remove `index.mapping.single_type=false` from reindex tests (elastic#25365) Adapt `SearchIT#testSearchWithParentJoin` to new join field (elastic#25379) Added unit test coverage for SignificantTerms (elastic#24904)
* master: Throw useful error on bad docs snippets (elastic#25389) Remove `mapping.single_type` from parent join test (elastic#25391) Tests: Fix array out of bounds exception in TemplateUpgradeServiceIT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff.
Relates #10708, relates #25355