Skip to content

Commit

Permalink
Separate ALREADY_COMMITTED state from SUCCEEDED (elastic#23)
Browse files Browse the repository at this point in the history
Previously, the assertion in onPossibleCommitFailure() would fail if there's an already-committed node, which was marked as SUCCEEDED.

Additionally, previously we didn't take into account that any already-committed nodes won't be voting for this publication, so if one of the other voting nodes fails then we might not have failed the leader straight away.

This fixes that.
  • Loading branch information
DaveCTurner authored Jan 10, 2018
1 parent fdac2dc commit 5a823ad
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ public enum PublicationTargetState {
WAITING_FOR_QUORUM,
SENT_CATCH_UP,
SENT_APPLY_COMMIT,
SUCCEEDED
APPLIED_COMMIT,
ALREADY_COMMITTED
}

/**
Expand Down Expand Up @@ -370,12 +371,13 @@ private void onPossibleCommitFailure() {
NodeCollection possiblySuccessfulNodes = new NodeCollection();
for (PublicationTarget publicationTarget : publicationTargets) {
if (publicationTarget.state == PublicationTargetState.SENT_PUBLISH_REQUEST
|| publicationTarget.state == PublicationTargetState.WAITING_FOR_QUORUM
|| publicationTarget.state == PublicationTargetState.SENT_CATCH_UP) {
|| publicationTarget.state == PublicationTargetState.SENT_CATCH_UP
|| publicationTarget.state == PublicationTargetState.WAITING_FOR_QUORUM) {

possiblySuccessfulNodes.add(publicationTarget.discoveryNode);
} else {
assert publicationTarget.state == PublicationTargetState.FAILED;
assert publicationTarget.state == PublicationTargetState.FAILED
|| publicationTarget.state == PublicationTargetState.ALREADY_COMMITTED;
}
}

Expand Down Expand Up @@ -412,7 +414,6 @@ void handlePublishResponse(PublishResponse publishResponse) {
Optional<ApplyCommit> optionalCommit = consensusState.handlePublishResponse(discoveryNode, publishResponse);
optionalCommit.ifPresent(Publication.this::onCommitted);
}
// TODO: handle negative votes and move to candidate if leader
}

public void sendApplyCommit() {
Expand Down Expand Up @@ -452,10 +453,11 @@ public void handleResponse(LegislatorPublishResponse response) {
state = PublicationTargetState.SENT_CATCH_UP;
transport.sendCatchUp(discoveryNode, catchUp, new CatchUpResponseHandler());
} else if (response.getFirstUncommittedSlot() > publishRequest.getSlot()) {
logger.debug("PublishResponseHandler.handleResponse: [{}] is at newer slot {} (vs {}), marking as successful",
logger.debug("PublishResponseHandler.handleResponse: [{}] is at newer slot {} (vs {}), marking ALREADY_COMMITTED",
discoveryNode, response.getFirstUncommittedSlot(), publishRequest.getSlot());
assert false == response.getPublishResponse().isPresent();
state = PublicationTargetState.SUCCEEDED;
state = PublicationTargetState.ALREADY_COMMITTED;
onPossibleCommitFailure();
} else {
assert response.getPublishResponse().isPresent();
assert response.getFirstUncommittedSlot() == publishRequest.getSlot();
Expand Down Expand Up @@ -524,7 +526,7 @@ public void handleResponse(TransportResponse.Empty response) {
}

assert state == PublicationTargetState.SENT_APPLY_COMMIT;
state = PublicationTargetState.SUCCEEDED;
state = PublicationTargetState.APPLIED_COMMIT;
}

@Override
Expand Down

0 comments on commit 5a823ad

Please sign in to comment.