Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Fixes LeaseLostException leaks on notification APIs for Renew scenarios #3775

Merged
merged 2 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ await this.leaseUpdater.UpdateLeaseAsync(
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token {0} no need to release lease. The lease was already taken by another host '{1}'.", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
serverLease.Owner = null;
return serverLease;
Expand Down Expand Up @@ -232,7 +242,17 @@ public override async Task<DocumentServiceLease> RenewAsync(DocumentServiceLease
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token {0} was taken over by owner '{1}'", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
return serverLease;
}).ConfigureAwait(false);
Expand All @@ -245,7 +265,17 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
if (lease.Owner != this.options.HostName)
{
DefaultTrace.TraceInformation("Lease with token '{0}' was taken over by owner '{1}' before lease properties update", lease.CurrentLeaseToken, lease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{lease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}

return await this.leaseUpdater.UpdateLeaseAsync(
Expand All @@ -257,7 +287,17 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token '{0}' was taken over by owner '{1}'", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
serverLease.Properties = lease.Properties;
return serverLease;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,201 @@ public async Task IfOwnerChangedThrow()
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the renewed read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnRenew()
{
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = Guid.NewGuid().ToString()
};

DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.RenewAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the update properties read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnUpdateProperties()
{
DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = lease.Owner
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.UpdatePropertiesAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the update properties read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnRelease()
{
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = Guid.NewGuid().ToString()
};

DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.ReleaseAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// When a lease is missing the range information, check that we are adding it
/// </summary>
Expand Down