Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race that causes "lease id cannot be null or empty" illegal argument exception from storage #64

Merged
merged 2 commits into from
Mar 3, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
boolean succeeded = true;
String newLeaseId = EventProcessorHost.safeCreateUUID();
if ((newLeaseId == null) || newLeaseId.isEmpty())
{
Expand All @@ -400,31 +400,48 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED)
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
if ((lease.getToken() == null) || lease.getToken().isEmpty())
{
// We reach here in a race condition: when this instance of EventProcessorHost scanned the
// lease blobs, this partition was unowned (token is empty) but between then and now, another
// instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
// that we only steal the lease if it is still owned by the instance which owned it when we
// scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
// fail the acquisition. If that means that one EPH instance gets more partitions than it should,
// rebalancing will take care of that quickly enough.
succeeded = false;
}
else
{
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
Copy link
Member

@serkantkaraca serkantkaraca Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lease.getToken() [](start = 92, length = 16)

Due to the same race, this can still throw NullRef, right? #Closed

}
}
else
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId);
}
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
if (succeeded)
{
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
}
}
catch (StorageException se)
{
if (wasLeaseLost(se, lease.getPartitionId()))
{
retval = false;
succeeded = false;
}
else
{
throw se;
}
}

return retval;
return succeeded;
}

@Override
Expand Down