Skip to content

Commit

Permalink
Fix race that causes "lease id cannot be null or empty" illegal argument
Browse files Browse the repository at this point in the history
exception from storage.
  • Loading branch information
JamesBirdsall committed Feb 27, 2017
1 parent 3a5370b commit 50fb1db
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
boolean succeeded = true;
String newLeaseId = EventProcessorHost.safeCreateUUID();
if ((newLeaseId == null) || newLeaseId.isEmpty())
{
Expand All @@ -400,31 +400,48 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED)
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
if ((lease.getToken() == null) || lease.getToken().isEmpty())
{
// We reach here in a race condition: when this instance of EventProcessorHost scanned the
// lease blobs, this partition was unowned (token is empty) but between then and now, another
// instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
// that we only steal the lease if it is still owned by the instance which owned it when we
// scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
// fail the acquisition. If that means that one EPH instance gets more partitions than it should,
// rebalancing will take care of that quickly enough.
succeeded = false;
}
else
{
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
}
}
else
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId);
}
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
if (succeeded)
{
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
}
}
catch (StorageException se)
{
if (wasLeaseLost(se, lease.getPartitionId()))
{
retval = false;
succeeded = false;
}
else
{
throw se;
}
}

return retval;
return succeeded;
}

@Override
Expand Down

0 comments on commit 50fb1db

Please sign in to comment.