Skip to content

Commit

Permalink
Merge pull request #64 from Azure/javaEPHleaseid
Browse files Browse the repository at this point in the history
Fix race that causes "lease id cannot be null or empty" illegal argument exception from storage
  • Loading branch information
JamesBirdsall authored Mar 3, 2017
2 parents ed5b8d0 + 2830415 commit bc279a7
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
boolean succeeded = true;
String newLeaseId = EventProcessorHost.safeCreateUUID();
if ((newLeaseId == null) || newLeaseId.isEmpty())
{
Expand All @@ -400,31 +400,48 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED)
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
if ((lease.getToken() == null) || lease.getToken().isEmpty())
{
// We reach here in a race condition: when this instance of EventProcessorHost scanned the
// lease blobs, this partition was unowned (token is empty) but between then and now, another
// instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
// that we only steal the lease if it is still owned by the instance which owned it when we
// scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
// fail the acquisition. If that means that one EPH instance gets more partitions than it should,
// rebalancing will take care of that quickly enough.
succeeded = false;
}
else
{
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
}
}
else
{
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId);
}
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
if (succeeded)
{
lease.setToken(newToken);
lease.setOwner(this.host.getHostName());
lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire);
}
}
catch (StorageException se)
{
if (wasLeaseLost(se, lease.getPartitionId()))
{
retval = false;
succeeded = false;
}
else
{
throw se;
}
}

return retval;
return succeeded;
}

@Override
Expand Down

0 comments on commit bc279a7

Please sign in to comment.