Skip to content

Commit

Permalink
Implement asynchronous support in ODataJsonLightBatchWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
John Gathogo committed Jun 24, 2021
1 parent 729579d commit fca52e2
Show file tree
Hide file tree
Showing 5 changed files with 907 additions and 22 deletions.
5 changes: 3 additions & 2 deletions src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ internal static ODataReadStream CreateBatchOperationReadStream(
/// <returns>A new <see cref="ODataWriteStream"/> instance.</returns>
internal static ODataWriteStream CreateBatchOperationWriteStream(
Stream outputStream,
IODataStreamListener operationListener)
IODataStreamListener operationListener,
bool synchronous = true)
{
Debug.Assert(outputStream != null, "outputStream != null");
Debug.Assert(operationListener != null, "operationListener != null");

return new ODataWriteStream(outputStream, operationListener);
return new ODataWriteStream(outputStream, operationListener, synchronous);
}

/// <summary>
Expand Down
157 changes: 145 additions & 12 deletions src/Microsoft.OData.Core/Batch/ODataBatchWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void WriteStartBatch()
public Task WriteStartBatchAsync()
{
this.VerifyCanWriteStartBatch(false);
return TaskUtils.GetTaskForSynchronousOperation(this.WriteStartBatchImplementation);
return this.WriteStartBatchImplementationAsync();
}

/// <summary>Ends a batch; can only be called after WriteStartBatch has been called and if no other active changeset or operation exist.</summary>
Expand All @@ -188,8 +188,7 @@ public void WriteEndBatch()
public Task WriteEndBatchAsync()
{
this.VerifyCanWriteEndBatch(false);
return TaskUtils.GetTaskForSynchronousOperation(this.WriteEndBatchImplementation)

return this.WriteEndBatchImplementationAsync()
// Note that we intentionally go through the public API so that if the Flush fails the writer moves to the Error state.
.FollowOnSuccessWithTask(task => this.FlushAsync());
}
Expand Down Expand Up @@ -236,7 +235,7 @@ public Task WriteStartChangesetAsync(string changesetId)
ExceptionUtils.CheckArgumentNotNull(changesetId, "changesetId");

this.VerifyCanWriteStartChangeset(false);
return TaskUtils.GetTaskForSynchronousOperation(() => this.WriteStartChangesetImplementation(changesetId))
return this.WriteStartChangesetImplementationAsync(changesetId)
.FollowOnSuccessWith(t => this.FinishWriteStartChangeset());
}

Expand All @@ -253,7 +252,7 @@ public void WriteEndChangeset()
public Task WriteEndChangesetAsync()
{
this.VerifyCanWriteEndChangeset(false);
return TaskUtils.GetTaskForSynchronousOperation(this.WriteEndChangesetImplementation)
return this.WriteEndChangesetImplementationAsync()
.FollowOnSuccessWith(t => this.FinishWriteEndChangeset());
}

Expand Down Expand Up @@ -340,9 +339,7 @@ public Task<ODataBatchOperationRequestMessage> CreateOperationRequestMessageAsyn
BatchPayloadUriOption payloadUriOption, IList<string> dependsOnIds)
{
this.VerifyCanCreateOperationRequestMessage(false, method, uri, contentId);

return TaskUtils.GetTaskForSynchronousOperation<ODataBatchOperationRequestMessage>(() =>
CreateOperationRequestMessageInternal(method, uri, contentId, payloadUriOption, dependsOnIds));
return this.CreateOperationRequestMessageInternalAsync(method, uri, contentId, payloadUriOption, dependsOnIds);
}

/// <summary>Creates a message for writing an operation of a batch response.</summary>
Expand All @@ -360,8 +357,7 @@ public ODataBatchOperationResponseMessage CreateOperationResponseMessage(string
public Task<ODataBatchOperationResponseMessage> CreateOperationResponseMessageAsync(string contentId)
{
this.VerifyCanCreateOperationResponseMessage(false);
return TaskUtils.GetTaskForSynchronousOperation<ODataBatchOperationResponseMessage>(
() => this.CreateOperationResponseMessageImplementation(contentId));
return this.CreateOperationResponseMessageImplementationAsync(contentId);
}

/// <summary>Flushes the write buffer to the underlying stream.</summary>
Expand Down Expand Up @@ -553,7 +549,8 @@ protected ODataBatchOperationRequestMessage BuildOperationRequestMessage(Stream

ODataBatchUtils.ValidateReferenceUri(uri, requestIdsForUrlReferenceValidation, this.outputContext.MessageWriterSettings.BaseUri);

Func<Stream> streamCreatorFunc = () => ODataBatchUtils.CreateBatchOperationWriteStream(outputStream, this);
Func<Stream> streamCreatorFunc = () => ODataBatchUtils.CreateBatchOperationWriteStream(
outputStream, this, this.outputContext.Synchronous);
ODataBatchOperationRequestMessage requestMessage =
new ODataBatchOperationRequestMessage(streamCreatorFunc, method, uri, /*headers*/ null, this, contentId,
this.payloadUriConverter, /*writing*/ true, this.container, dependsOnIds, groupId);
Expand All @@ -573,11 +570,91 @@ protected ODataBatchOperationRequestMessage BuildOperationRequestMessage(Stream
protected ODataBatchOperationResponseMessage BuildOperationResponseMessage(Stream outputStream,
string contentId, string groupId)
{
Func<Stream> streamCreatorFunc = () => ODataBatchUtils.CreateBatchOperationWriteStream(outputStream, this);
Func<Stream> streamCreatorFunc = () => ODataBatchUtils.CreateBatchOperationWriteStream(
outputStream, this, this.outputContext.Synchronous);
return new ODataBatchOperationResponseMessage(streamCreatorFunc, /*headers*/ null, this, contentId,
this.payloadUriConverter.BatchMessagePayloadUriConverter, /*writing*/ true, this.container, groupId);
}

/// <summary>
/// Asnchronously starts a new batch.
/// </summary>
/// <returns>A task that represents the asynchronous write operation.</returns>
protected virtual Task WriteStartBatchImplementationAsync()
{
return TaskUtils.GetTaskForSynchronousOperation(this.WriteStartBatchImplementation);
}

/// <summary>
/// Asynchronously ends a batch.
/// </summary>
/// <returns>A task that represents the asynchronous write operation.</returns>
protected virtual Task WriteEndBatchImplementationAsync()
{
return TaskUtils.GetTaskForSynchronousOperation(this.WriteEndBatchImplementation);
}

/// <summary>
/// Asynchronously starts a new changeset.
/// </summary>
/// <param name="groupOrChangesetId">The atomic group id, aka changeset GUID of the batch request.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
protected virtual Task WriteStartChangesetImplementationAsync(string groupOrChangesetId)
{
return TaskUtils.GetTaskForSynchronousOperation(
() => this.WriteStartChangesetImplementation(groupOrChangesetId));
}

/// <summary>
/// Asynchronously ends an active changeset.
/// </summary>
/// <returns>A task that represents the asynchronous write operation.</returns>
protected virtual Task WriteEndChangesetImplementationAsync()
{
return TaskUtils.GetTaskForSynchronousOperation(this.WriteEndChangesetImplementation);
}

/// <summary>
/// Asynchronously creates an <see cref="ODataBatchOperationRequestMessage"/> for writing an operation of a batch request.
/// </summary>
/// <param name="method">The Http method to be used for this request operation.</param>
/// <param name="uri">The Uri to be used for this request operation.</param>
/// <param name="contentId">The Content-ID value to write in ChangeSet head.</param>
/// <param name="payloadUriOption">
/// The format of operation Request-URI, which could be AbsoluteUri, AbsoluteResourcePathAndHost, or RelativeResourcePath.</param>
/// <param name="dependsOnIds">The prerequisite request ids of this request.</param>
/// <returns>A task that represents the asynchronous operation.
/// The value of the TResult parameter contains an <see cref="ODataBatchOperationRequestMessage"/>
/// that can be used to write the request operation.</returns>
protected virtual Task<ODataBatchOperationRequestMessage> CreateOperationRequestMessageImplementationAsync(
string method,
Uri uri,
string contentId,
BatchPayloadUriOption payloadUriOption,
IEnumerable<string> dependsOnIds)
{
return TaskUtils.GetTaskForSynchronousOperation(
() => this.CreateOperationRequestMessageImplementation(
method,
uri,
contentId,
payloadUriOption,
dependsOnIds));
}

/// <summary>
/// Asynchronously creates an <see cref="ODataBatchOperationResponseMessage"/> for writing an operation of a batch response.
/// </summary>
/// <param name="contentId">The Content-ID value to write in ChangeSet head.</param>
/// <returns>A task that represents the asynchronous operation.
/// The value of the TResult parameter contains an <see cref="ODataBatchOperationResponseMessage"/>
/// that can be used to write the response operation.</returns>
protected virtual Task<ODataBatchOperationResponseMessage> CreateOperationResponseMessageImplementationAsync(string contentId)
{
return TaskUtils.GetTaskForSynchronousOperation(
() => this.CreateOperationResponseMessageImplementation(contentId));
}

/// <summary>
/// Catch any exception thrown by the action passed in; in the exception case move the writer into
/// state ExceptionThrown and then re-throw the exception.
Expand Down Expand Up @@ -662,6 +739,62 @@ private ODataBatchOperationRequestMessage CreateOperationRequestMessageInternal(
return this.CurrentOperationRequestMessage;
}

/// <summary>
/// Internal method to create an <see cref="Microsoft.OData.ODataBatchOperationRequestMessage" /> for writing
/// an operation of a batch request.
/// </summary>
/// <param name="method">The Http method to be used for this request operation.</param>
/// <param name="uri">The Uri to be used for this request operation.</param>
/// <param name="contentId">
/// For batch in multipart format, the Content-ID value to write in ChangeSet header, would be ignored if
/// <paramref name="method"/> is "GET".
/// For batch in Json format, if the value passed in is null, an GUID will be generated and used as the request id.
/// </param>
/// <param name="payloadUriOption">
/// The format of operation Request-URI, which could be AbsoluteUri, AbsoluteResourcePathAndHost, or RelativeResourcePath.</param>
/// <param name="dependsOnIds">The prerequisite request ids of this request.</param>
/// <returns>The message that can be used to write the request operation.</returns>
private async Task<ODataBatchOperationRequestMessage> CreateOperationRequestMessageInternalAsync(
string method,
Uri uri,
string contentId,
BatchPayloadUriOption payloadUriOption,
IEnumerable<string> dependsOnIds)
{
if (!this.isInChangset)
{
this.InterceptException(this.IncreaseBatchSize);
}
else
{
this.InterceptException(this.IncreaseChangeSetSize);
}

// Add a potential Content-ID header to the URL resolver so that it will be available
// to subsequent operations.
// Note that what we add here is the Content-ID header of the previous operation (if any).
// This also means that the Content-ID of the last operation in a changeset will never get
// added to the cache which is fine since we cannot reference it anywhere.
if (this.currentOperationContentId != null)
{
this.payloadUriConverter.AddContentId(this.currentOperationContentId);
}

this.InterceptException(() =>
uri = ODataBatchUtils.CreateOperationRequestUri(uri, this.outputContext.MessageWriterSettings.BaseUri, this.payloadUriConverter));

this.CurrentOperationRequestMessage = await this.CreateOperationRequestMessageImplementationAsync(
method, uri, contentId, payloadUriOption, dependsOnIds).ConfigureAwait(false);

if (this.isInChangset || this.outputContext.MessageWriterSettings.Version > ODataVersion.V4)
{
// The content Id can be generated if the value passed in is null, therefore here we get the real value of the content Id.
this.RememberContentIdHeader(this.CurrentOperationRequestMessage.ContentId);
}

return this.CurrentOperationRequestMessage;
}

/// <summary>
/// Perform updates after changeset is started.
/// </summary>
Expand Down
Loading

0 comments on commit fca52e2

Please sign in to comment.