From 0e6d796473475e99b8c20c0a29012a780871e641 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 2 Dec 2024 10:42:30 -0800 Subject: [PATCH 1/9] start --- .../RxThinclientStoreModel.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java new file mode 100644 index 000000000000..83d7a791fa74 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import com.azure.cosmos.CosmosContainerProactiveInitConfig; +import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; +import com.azure.cosmos.models.CosmosContainerIdentity; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; + +/** + * While this class is public, but it is not part of our published public APIs. + * This is meant to be internally used only by our sdk. + * + * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. + */ +public class RxThinclientStoreModel implements RxStoreModel { + @Override + public Mono processMessage(RxDocumentServiceRequest request) { + return null; + } + + @Override + public void enableThroughputControl(ThroughputControlStore throughputControlStore) { + + } + + @Override + public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { + return null; + } + + @Override + public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) { + + } + + @Override + public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { + + } + + @Override + public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { + + } +} From ace2685ae0c899ed216b5ebbc6ee352d6de4e6e1 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 16 Dec 2024 15:17:22 -0800 Subject: [PATCH 2/9] push progress --- .../cosmos/implementation/HttpConstants.java | 4 + .../RxDocumentServiceRequest.java | 5 + ...reModel.java => ThinClientStoreModel.java} | 17 +- .../directconnectivity/ProxyStoreClient.java | 278 ++++++++++++++++++ 4 files changed, 302 insertions(+), 2 deletions(-) rename sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/{RxThinclientStoreModel.java => ThinClientStoreModel.java} (70%) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index a04313a3d34c..c6d167e87778 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -282,6 +282,10 @@ public static class HttpHeaders { // Priority Level for throttling public static final String PRIORITY_LEVEL = "x-ms-cosmos-priority-level"; + + // Thinclient headers + public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type"; + public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type"; } public static class A_IMHeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index a0b7daf4b351..3cd7ec512b00 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1233,4 +1233,9 @@ public String getEffectivePartitionKey() { public void setEffectivePartitionKey(String effectivePartitionKey) { this.effectivePartitionKey = effectivePartitionKey; } + + public void setThinclientHeaders(String operationType, String resourceType) { + this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType); + this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java similarity index 70% rename from sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java rename to sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 83d7a791fa74..4ab773509542 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxThinclientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -3,6 +3,8 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.CosmosContainerProactiveInitConfig; +import com.azure.cosmos.implementation.directconnectivity.ProxyStoreClient; +import com.azure.cosmos.implementation.directconnectivity.StoreClient; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import com.azure.cosmos.models.CosmosContainerIdentity; @@ -17,10 +19,21 @@ * * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. */ -public class RxThinclientStoreModel implements RxStoreModel { +public class ThinClientStoreModel implements RxStoreModel { + + private final ProxyStoreClient storeClient; + + public ThinClientStoreModel(ProxyStoreClient storeClient) { + this.storeClient = storeClient; + } + @Override public Mono processMessage(RxDocumentServiceRequest request) { - return null; + // direct/gateway mode validations? session token, bad consistency level header + + // set headers here? .NET sets in client + request.setThinclientHeaders(request.getOperationType().toString(), request.getResourceType().toString()); + return this.storeClient.processMessageAsync(request); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java new file mode 100644 index 000000000000..751d8363b8dd --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java @@ -0,0 +1,278 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosContainerProactiveInitConfig; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.SessionRetryOptions; +import com.azure.cosmos.implementation.BackoffRetryUtility; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.Exceptions; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.IAuthorizationTokenProvider; +import com.azure.cosmos.implementation.IRetryPolicy; +import com.azure.cosmos.implementation.ISessionContainer; +import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.InternalServerErrorException; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.RMResources; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.SessionTokenHelper; +import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.math.NumberUtils; +import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; +import com.azure.cosmos.models.CosmosContainerIdentity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Function; + +/** + * Instantiated to issue direct connectivity requests to the backend on THINCLIENT for thinclient users. + * StoreClient uses the ReplicatedResourceClient to make requests to the backend. + */ +public class ProxyStoreClient implements IStoreClient { + private final DiagnosticsClientContext diagnosticsClientContext; + private final Logger logger = LoggerFactory.getLogger(ProxyStoreClient.class); + private final GatewayServiceConfigurationReader serviceConfigurationReader; + private final ISessionContainer sessionContainer; + private final ReplicatedResourceClient replicatedResourceClient; + private final TransportClient transportClient; + private final String ZERO_PARTITION_KEY_RANGE = "0"; + + public ProxyStoreClient( + DiagnosticsClientContext diagnosticsClientContext, + Configs configs, + IAddressResolver addressResolver, + ISessionContainer sessionContainer, + GatewayServiceConfigurationReader serviceConfigurationReader, IAuthorizationTokenProvider userTokenProvider, + TransportClient transportClient, + boolean useMultipleWriteLocations, + SessionRetryOptions sessionRetryOptions) { + this.diagnosticsClientContext = diagnosticsClientContext; + this.transportClient = transportClient; + this.sessionContainer = sessionContainer; + this.serviceConfigurationReader = serviceConfigurationReader; + this.replicatedResourceClient = new ReplicatedResourceClient( + diagnosticsClientContext, + configs, + new AddressSelector(addressResolver, configs.getProtocol()), + sessionContainer, + this.transportClient, + serviceConfigurationReader, + userTokenProvider, + useMultipleWriteLocations, + sessionRetryOptions); + + addressResolver.setOpenConnectionsProcessor(this.transportClient.getProactiveOpenConnectionsProcessor()); + } + + public void enableThroughputControl(ThroughputControlStore throughputControlStore) { + this.replicatedResourceClient.enableThroughputControl(throughputControlStore); + } + + private Mono InvokeClientAsync( + DocumentServiceRequest request, + ResourceType resourceType, + Uri physicalAddress, + CancellationToken cancellationToken) { + + } + + @Override + public Mono processMessageAsync(RxDocumentServiceRequest request, IRetryPolicy retryPolicy, Function> prepareRequestAsyncDelegate) { + if (request == null) { + throw new NullPointerException("request"); + } + + // HTTP2 transport + // serialize payload to RNTBD + + /*using (HttpResponseMessage responseMessage = await this.InvokeClientAsync(request, resourceOperation.resourceType, physicalAddress, default)) + { + return await HttpTransportClient.ProcessHttpResponse(request.ResourceAddress, string.Empty, responseMessage, physicalAddress, request); + }*/ + + Callable> storeResponseDelegate = () -> this.replicatedResourceClient.invokeAsync(request, prepareRequestAsyncDelegate); + + Mono storeResponse; + try { + storeResponse = retryPolicy != null + ? BackoffRetryUtility.executeRetry(storeResponseDelegate, retryPolicy) + : storeResponseDelegate.call(); + } catch (Exception e) { + return Mono.error(e); + } + + storeResponse = storeResponse.doOnError(e -> { + try { + Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); + CosmosException exception = Utils.as(unwrappedException, CosmosException.class); + + if (exception == null) { + return; + } + + BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); + exception = BridgeInternal.setCosmosDiagnostics(exception, request.requestContext.cosmosDiagnostics); + + handleUnsuccessfulStoreResponse(request, exception); + } catch (Throwable throwable) { + logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); + logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } + } + } + ); + + return storeResponse.flatMap(sr -> { + try { + return Mono.just(this.completeResponse(sr, request)); + } catch (Exception e) { + return Mono.error(e); + } + }); + } + + @Override + public Flux submitOpenConnectionTasksAndInitCaches( + CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { + return this.replicatedResourceClient.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig); + } + + public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) { + this.replicatedResourceClient.configureFaultInjectorProvider(injectorProvider); + } + + public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { + this.replicatedResourceClient.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities); + } + + public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { + this.replicatedResourceClient.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities); + } + + private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, CosmosException exception) { + this.updateResponseHeader(request, exception.getResponseHeaders()); + if ((!ReplicatedResourceClient.isMasterResource(request.getResourceType())) && + (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.PRECONDITION_FAILED) || Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.CONFLICT) || + (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.NOTFOUND) && + !Exceptions.isSubStatusCode(exception, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)))) { + this.captureSessionToken(request, exception.getResponseHeaders()); + } + } + + private RxDocumentServiceResponse completeResponse( + StoreResponse storeResponse, + RxDocumentServiceRequest request) throws InternalServerErrorException { + + if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) { + throw new InternalServerErrorException( + Exceptions.getInternalServerErrorMessage(RMResources.InvalidBackendResponse), + HttpConstants.SubStatusCodes.INVALID_BACKEND_RESPONSE); + } + + Map headers = new HashMap<>(storeResponse.getResponseHeaderNames().length); + for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) { + String name = storeResponse.getResponseHeaderNames()[idx]; + String value = storeResponse.getResponseHeaderValues()[idx]; + + headers.put(name, value); + } + + this.updateResponseHeader(request, headers); + this.captureSessionToken(request, headers); + BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); + RxDocumentServiceResponse rxDocumentServiceResponse = + new RxDocumentServiceResponse(this.diagnosticsClientContext, storeResponse); + rxDocumentServiceResponse.setCosmosDiagnostics(request.requestContext.cosmosDiagnostics); + + return rxDocumentServiceResponse; + } + + private long getLSN(Map headers) { + long defaultValue = -1; + String value = headers.get(WFConstants.BackendHeaders.LSN); + + if (!Strings.isNullOrEmpty(value)) { + return NumberUtils.toLong(value, defaultValue); + + } + + return defaultValue; + } + + private void updateResponseHeader(RxDocumentServiceRequest request, Map headers) { + String requestConsistencyLevel = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + + boolean sessionConsistency = + this.serviceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION || + (!Strings.isNullOrEmpty(requestConsistencyLevel) + && Strings.areEqualIgnoreCase(requestConsistencyLevel, ConsistencyLevel.SESSION.toString())); + + long storeLSN = this.getLSN(headers); + if (storeLSN == -1) { + return; + } + + String partitionKeyRangeId = headers.get(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); + + if (Strings.isNullOrEmpty(partitionKeyRangeId)) { + String inputSession = request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); + if (!Strings.isNullOrEmpty(inputSession) + && inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR) >= 1) { + partitionKeyRangeId = inputSession.substring(0, + inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR)); + } else { + partitionKeyRangeId = ZERO_PARTITION_KEY_RANGE; + } + } + + ISessionToken sessionToken = null; + String sessionTokenResponseHeader = headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN); + if (!Strings.isNullOrEmpty(sessionTokenResponseHeader)) { + sessionToken = SessionTokenHelper.parse(sessionTokenResponseHeader); + } + + if (sessionToken != null) { + headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, + SessionTokenHelper.concatPartitionKeyRangeIdWithSessionToken(partitionKeyRangeId, sessionToken.convertToString())); + } + + headers.remove(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); + } + + private void captureSessionToken(RxDocumentServiceRequest request, Map headers) { + if (request.getResourceType() == ResourceType.DocumentCollection + && request.getOperationType() == OperationType.Delete) { + String resourceId; + if (request.getIsNameBased()) { + resourceId = headers.get(HttpConstants.HttpHeaders.OWNER_ID); + } else { + resourceId = request.getResourceId(); + } + this.sessionContainer.clearTokenByResourceId(resourceId); + } else { + this.sessionContainer.setSessionToken(request, headers); + } + } + + // TODO RNTBD support + // https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/262496 +} From a570c8396dca08b88a40e93915abcdf631667366 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Mon, 16 Dec 2024 23:33:57 +0000 Subject: [PATCH 3/9] Update ThinClientStoreModel.java --- .../com/azure/cosmos/implementation/ThinClientStoreModel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 4ab773509542..f291196f1360 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -15,7 +15,7 @@ /** * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. + * This is meant to be internally used only by our sdk. * * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. */ From 27318e553d8a75b27081436acfcad3ef3c4821a9 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 17 Dec 2024 23:38:39 +0000 Subject: [PATCH 4/9] Skeleton for ThinClientStoreModel and RNTBD serialization --- .../java/com/azure/cosmos/CosmosItemTest.java | 6 + .../RxGatewayStoreModelTest.java | 2 +- .../RxDocumentServiceRequest.java | 25 ++ .../implementation/RxGatewayStoreModel.java | 206 +++++++------ .../implementation/ThinClientStoreModel.java | 80 +++-- .../directconnectivity/ProxyStoreClient.java | 278 ------------------ .../http/HttpTransportSerializer.java | 17 ++ 7 files changed, 215 insertions(+), 399 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java index bbc7e26a3104..1cc8c9676115 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java @@ -104,6 +104,12 @@ public void afterClass() { @Test(groups = { "fast" }, timeOut = TIMEOUT) public void createItem() throws Exception { + // TODO @nehrao/@fabianm REMOVE BEFORE CHECK-IN + if (client.asyncClient().getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT || + client.asyncClient().getEffectiveConsistencyLevel(OperationType.Read, null) != ConsistencyLevel.STRONG) { + + throw new SkipException("Disabled for debugging"); + } InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString()); CosmosItemResponse itemResponse = container.createItem(properties); assertThat(itemResponse.getRequestCharge()).isGreaterThan(0); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java index 0ea494c8dbc6..bd26bb53a0b5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java @@ -274,7 +274,7 @@ public void validateApiType() throws Exception { ResourceType.Document); try { - storeModel.performRequest(dsr, HttpMethod.POST).block(); + storeModel.performRequest(dsr).block(); fail("Request should fail"); } catch (Exception e) { //no-op diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 3cd7ec512b00..6e19d5770ef5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.directconnectivity.WFConstants; import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.implementation.routing.Range; @@ -29,6 +30,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * This is core Transport/Connection agnostic request to the Azure Cosmos DB database service. @@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable { private volatile boolean hasFeedRangeFilteringBeenApplied = false; + private final AtomicReference httpTransportSerializer = new AtomicReference<>(null); + public boolean isReadOnlyRequest() { return this.operationType.isReadOnlyOperation(); } @@ -1238,4 +1244,23 @@ public void setThinclientHeaders(String operationType, String resourceType) { this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType); this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType); } + + public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) { + this.httpTransportSerializer.set(transportSerializer); + + return this; + } + + public HttpTransportSerializer getEffectiveHttpTransportSerializer( + HttpTransportSerializer defaultTransportSerializer) { + + checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null."); + + HttpTransportSerializer snapshot = this.httpTransportSerializer.get(); + if (snapshot != null) { + return snapshot; + } + + return defaultTransportSerializer; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 4ff506b71165..31d8271276d5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -12,7 +12,9 @@ import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader; import com.azure.cosmos.implementation.directconnectivity.HttpUtils; import com.azure.cosmos.implementation.directconnectivity.RequestHelper; +import com.azure.cosmos.implementation.directconnectivity.ResourceOperation; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; import com.azure.cosmos.implementation.faultinjection.GatewayServerErrorInjector; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; @@ -20,6 +22,7 @@ import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; @@ -47,6 +50,7 @@ import java.util.concurrent.Callable; import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. @@ -54,7 +58,7 @@ * * Used internally to provide functionality to communicate and process response from GATEWAY in the Azure Cosmos DB database service. */ -public class RxGatewayStoreModel implements RxStoreModel { +public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerializer { private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed(); private final DiagnosticsClientContext clientContext; @@ -83,29 +87,12 @@ public RxGatewayStoreModel( ApiType apiType) { this.clientContext = clientContext; - this.defaultHeaders = new HashMap<>(); - this.defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, - "no-cache"); - this.defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, - HttpConstants.Versions.CURRENT_VERSION); - this.defaultHeaders.put( - HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, - HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); - - if (apiType != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); - } if (userAgentContainer == null) { userAgentContainer = new UserAgentContainer(); } - this.defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - - if (defaultConsistencyLevel != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, - defaultConsistencyLevel.toString()); - } + this.defaultHeaders = this.getDefaultHeaders(apiType, userAgentContainer, defaultConsistencyLevel); this.defaultConsistencyLevel = defaultConsistencyLevel; this.globalEndpointManager = globalEndpointManager; @@ -126,6 +113,40 @@ public RxGatewayStoreModel(RxGatewayStoreModel inner) { this.sessionContainer = inner.sessionContainer; } + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, + "no-cache"); + defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, + HttpConstants.Versions.CURRENT_VERSION); + defaultHeaders.put( + HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, + HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); + + if (apiType != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); + } + + if (userAgentContainer == null) { + userAgentContainer = new UserAgentContainer(); + } + + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + if (clientDefaultConsistencyLevel != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, + clientDefaultConsistencyLevel.toString()); + } + + return defaultHeaders; + } + void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) { this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader; } @@ -162,40 +183,46 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) { this.collectionCache = collectionCache; } - private Mono create(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono patch(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PATCH); - } - - private Mono upsert(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono read(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); - } - - private Mono replace(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PUT); - } - - private Mono delete(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.DELETE); - } + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + HttpMethod method = getHttpMethod(request); + HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - private Mono deleteByPartitionKey(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); + Flux contentAsByteArray = request.getContentAsByteArrayFlux(); + return new HttpRequest(method, + requestUri, + requestUri.getPort(), + httpHeaders, + contentAsByteArray); } - private Mono execute(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } + @Override + public StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content) { + + checkNotNull(headers, "Argument 'headers' must not be null."); + checkNotNull( + content, + "Argument 'content' must not be null - use empty ByteBuf when theres is no payload."); + + // If there is any error in the header response this throws exception + validateOrThrow(request, HttpResponseStatus.valueOf(statusCode), headers, content); + + int size; + if ((size = content.readableBytes()) > 0) { + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + new ByteBufInputStream(content, true), + size); + } - private Mono readFeed(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + null, + 0); } private Mono query(RxDocumentServiceRequest request) { @@ -215,10 +242,10 @@ private Mono query(RxDocumentServiceRequest request) RuntimeConstants.MediaTypes.QUERY_JSON); break; } - return this.performRequest(request, HttpMethod.POST); + return this.performRequest(request); } - public Mono performRequest(RxDocumentServiceRequest request, HttpMethod method) { + public Mono performRequest(RxDocumentServiceRequest request) { try { if (request.requestContext.cosmosDiagnostics == null) { request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); @@ -228,10 +255,10 @@ public Mono performRequest(RxDocumentServiceRequest r request.requestContext.resourcePhysicalAddress = uri.toString(); if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, method, uri))); + return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri))); } - return this.performRequestInternal(request, method, uri); + return this.performRequestInternal(request, uri); } catch (Exception e) { return Mono.error(e); } @@ -241,23 +268,21 @@ public Mono performRequest(RxDocumentServiceRequest r * Given the request it creates an flux which upon subscription issues HTTP call and emits one RxDocumentServiceResponse. * * @param request - * @param method * @param requestUri * @return Flux */ - public Mono performRequestInternal(RxDocumentServiceRequest request, HttpMethod method, URI requestUri) { + public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { try { + HttpMethod method = getHttpMethod(request); HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); Flux contentAsByteArray = request.getContentAsByteArrayFlux(); - HttpRequest httpRequest = new HttpRequest(method, - requestUri, - requestUri.getPort(), - httpHeaders, - contentAsByteArray); + HttpRequest httpRequest = request + .getEffectiveHttpTransportSerializer(this) + .wrapInHttpRequest(request, requestUri); Mono httpResponseMono = this.httpClient.send(httpRequest, request.getResponseTimeout()); @@ -371,23 +396,9 @@ private Mono toDocumentServiceResponse(Mono 0) { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - new ByteBufInputStream(content, true), - size); - } else { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - null, - 0); - } + StoreResponse rsp = request + .getEffectiveHttpTransportSerializer(this) + .unwrapToStoreResponse(request, httpResponseStatus, httpResponseHeaders, content); if (reactorNettyRequestRecord != null) { rsp.setRequestTimeline(reactorNettyRequestRecord.takeTimelineSnapshot()); @@ -518,28 +529,47 @@ private void validateOrThrow(RxDocumentServiceRequest request, } } - private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + private static HttpMethod getHttpMethod(RxDocumentServiceRequest request) { switch (request.getOperationType()) { case Create: case Batch: - return this.create(request); - case Patch: - return this.patch(request); case Upsert: - return this.upsert(request); + case ExecuteJavaScript: + case SqlQuery: + case Query: + case QueryPlan: + return HttpMethod.POST; + case Patch: + return HttpMethod.PATCH; case Delete: if (request.getResourceType() == ResourceType.PartitionKey) { - return this.deleteByPartitionKey(request); + return HttpMethod.POST; } - return this.delete(request); + return HttpMethod.DELETE; + case Read: + case ReadFeed: + return HttpMethod.GET; + case Replace: + return HttpMethod.PUT; + default: + throw new IllegalStateException( + "Operation type " + request.getOperationType() + " cannot be processed in RxGatewayStoreModel."); + } + } + + private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + switch (request.getOperationType()) { + case Create: + case Batch: + case Patch: + case Upsert: + case Delete: case ExecuteJavaScript: - return this.execute(request); case Read: - return this.read(request); case ReadFeed: - return this.readFeed(request); case Replace: - return this.replace(request); + return this.performRequest(request); + case SqlQuery: case Query: case QueryPlan: diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index f291196f1360..ad0bf54a5411 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -2,16 +2,20 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.implementation.directconnectivity.ProxyStoreClient; -import com.azure.cosmos.implementation.directconnectivity.StoreClient; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import com.azure.cosmos.models.CosmosContainerIdentity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. @@ -19,45 +23,57 @@ * * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. */ -public class ThinClientStoreModel implements RxStoreModel { - - private final ProxyStoreClient storeClient; +public class ThinClientStoreModel extends RxGatewayStoreModel { + + public ThinClientStoreModel( + DiagnosticsClientContext clientContext, + ISessionContainer sessionContainer, + ConsistencyLevel defaultConsistencyLevel, + UserAgentContainer userAgentContainer, + GlobalEndpointManager globalEndpointManager, + HttpClient httpClient) { + super( + clientContext, + sessionContainer, + defaultConsistencyLevel, + QueryCompatibilityMode.Default, + userAgentContainer, + globalEndpointManager, + httpClient, + ApiType.SQL); + } - public ThinClientStoreModel(ProxyStoreClient storeClient) { - this.storeClient = storeClient; + public ThinClientStoreModel(ThinClientStoreModel inner) { + super(inner); } @Override public Mono processMessage(RxDocumentServiceRequest request) { // direct/gateway mode validations? session token, bad consistency level header - // set headers here? .NET sets in client - request.setThinclientHeaders(request.getOperationType().toString(), request.getResourceType().toString()); - return this.storeClient.processMessageAsync(request); - } - - @Override - public void enableThroughputControl(ThroughputControlStore throughputControlStore) { - - } - - @Override - public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { - return null; + // TODO @nehrao/@fabianm FIX BEFORE CHECK-IN + // conditionally set RntbdTransportSerializer and physicalAddress here + // RntbdHttpTransportSerializer would need to create rntbdRequestArgs, then RntbdRequest from it and call encode + return super.processMessage(request); } @Override - public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) { - - } - - @Override - public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { - - } - - @Override - public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { - + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + // For ThinClient http/2 used for framing only + // All operation-level headers are only added to the rntbd-encoded message + // the thin client proxy wil parse the rntbd headers (not the content!) and substitute any + // missing headers for routing (like partitionId or replicaId) + // Since the Thin client proxy also needs to set the user-agent header to a different value + // it is not added to the rntbd headers - just http-headers in the SDK + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + return defaultHeaders; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java deleted file mode 100644 index 751d8363b8dd..000000000000 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.implementation.directconnectivity; - -import com.azure.cosmos.BridgeInternal; -import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.CosmosException; -import com.azure.cosmos.SessionRetryOptions; -import com.azure.cosmos.implementation.BackoffRetryUtility; -import com.azure.cosmos.implementation.Configs; -import com.azure.cosmos.implementation.DiagnosticsClientContext; -import com.azure.cosmos.implementation.Exceptions; -import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.IAuthorizationTokenProvider; -import com.azure.cosmos.implementation.IRetryPolicy; -import com.azure.cosmos.implementation.ISessionContainer; -import com.azure.cosmos.implementation.ISessionToken; -import com.azure.cosmos.implementation.InternalServerErrorException; -import com.azure.cosmos.implementation.OperationType; -import com.azure.cosmos.implementation.RMResources; -import com.azure.cosmos.implementation.ResourceType; -import com.azure.cosmos.implementation.RxDocumentServiceRequest; -import com.azure.cosmos.implementation.RxDocumentServiceResponse; -import com.azure.cosmos.implementation.SessionTokenHelper; -import com.azure.cosmos.implementation.Strings; -import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.apachecommons.lang.math.NumberUtils; -import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; -import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; -import com.azure.cosmos.models.CosmosContainerIdentity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.function.Function; - -/** - * Instantiated to issue direct connectivity requests to the backend on THINCLIENT for thinclient users. - * StoreClient uses the ReplicatedResourceClient to make requests to the backend. - */ -public class ProxyStoreClient implements IStoreClient { - private final DiagnosticsClientContext diagnosticsClientContext; - private final Logger logger = LoggerFactory.getLogger(ProxyStoreClient.class); - private final GatewayServiceConfigurationReader serviceConfigurationReader; - private final ISessionContainer sessionContainer; - private final ReplicatedResourceClient replicatedResourceClient; - private final TransportClient transportClient; - private final String ZERO_PARTITION_KEY_RANGE = "0"; - - public ProxyStoreClient( - DiagnosticsClientContext diagnosticsClientContext, - Configs configs, - IAddressResolver addressResolver, - ISessionContainer sessionContainer, - GatewayServiceConfigurationReader serviceConfigurationReader, IAuthorizationTokenProvider userTokenProvider, - TransportClient transportClient, - boolean useMultipleWriteLocations, - SessionRetryOptions sessionRetryOptions) { - this.diagnosticsClientContext = diagnosticsClientContext; - this.transportClient = transportClient; - this.sessionContainer = sessionContainer; - this.serviceConfigurationReader = serviceConfigurationReader; - this.replicatedResourceClient = new ReplicatedResourceClient( - diagnosticsClientContext, - configs, - new AddressSelector(addressResolver, configs.getProtocol()), - sessionContainer, - this.transportClient, - serviceConfigurationReader, - userTokenProvider, - useMultipleWriteLocations, - sessionRetryOptions); - - addressResolver.setOpenConnectionsProcessor(this.transportClient.getProactiveOpenConnectionsProcessor()); - } - - public void enableThroughputControl(ThroughputControlStore throughputControlStore) { - this.replicatedResourceClient.enableThroughputControl(throughputControlStore); - } - - private Mono InvokeClientAsync( - DocumentServiceRequest request, - ResourceType resourceType, - Uri physicalAddress, - CancellationToken cancellationToken) { - - } - - @Override - public Mono processMessageAsync(RxDocumentServiceRequest request, IRetryPolicy retryPolicy, Function> prepareRequestAsyncDelegate) { - if (request == null) { - throw new NullPointerException("request"); - } - - // HTTP2 transport - // serialize payload to RNTBD - - /*using (HttpResponseMessage responseMessage = await this.InvokeClientAsync(request, resourceOperation.resourceType, physicalAddress, default)) - { - return await HttpTransportClient.ProcessHttpResponse(request.ResourceAddress, string.Empty, responseMessage, physicalAddress, request); - }*/ - - Callable> storeResponseDelegate = () -> this.replicatedResourceClient.invokeAsync(request, prepareRequestAsyncDelegate); - - Mono storeResponse; - try { - storeResponse = retryPolicy != null - ? BackoffRetryUtility.executeRetry(storeResponseDelegate, retryPolicy) - : storeResponseDelegate.call(); - } catch (Exception e) { - return Mono.error(e); - } - - storeResponse = storeResponse.doOnError(e -> { - try { - Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); - CosmosException exception = Utils.as(unwrappedException, CosmosException.class); - - if (exception == null) { - return; - } - - BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); - exception = BridgeInternal.setCosmosDiagnostics(exception, request.requestContext.cosmosDiagnostics); - - handleUnsuccessfulStoreResponse(request, exception); - } catch (Throwable throwable) { - logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); - logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); - if (throwable instanceof Error) { - throw (Error) throwable; - } - } - } - ); - - return storeResponse.flatMap(sr -> { - try { - return Mono.just(this.completeResponse(sr, request)); - } catch (Exception e) { - return Mono.error(e); - } - }); - } - - @Override - public Flux submitOpenConnectionTasksAndInitCaches( - CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { - return this.replicatedResourceClient.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig); - } - - public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) { - this.replicatedResourceClient.configureFaultInjectorProvider(injectorProvider); - } - - public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { - this.replicatedResourceClient.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities); - } - - public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { - this.replicatedResourceClient.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities); - } - - private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, CosmosException exception) { - this.updateResponseHeader(request, exception.getResponseHeaders()); - if ((!ReplicatedResourceClient.isMasterResource(request.getResourceType())) && - (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.PRECONDITION_FAILED) || Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.CONFLICT) || - (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.NOTFOUND) && - !Exceptions.isSubStatusCode(exception, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)))) { - this.captureSessionToken(request, exception.getResponseHeaders()); - } - } - - private RxDocumentServiceResponse completeResponse( - StoreResponse storeResponse, - RxDocumentServiceRequest request) throws InternalServerErrorException { - - if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) { - throw new InternalServerErrorException( - Exceptions.getInternalServerErrorMessage(RMResources.InvalidBackendResponse), - HttpConstants.SubStatusCodes.INVALID_BACKEND_RESPONSE); - } - - Map headers = new HashMap<>(storeResponse.getResponseHeaderNames().length); - for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) { - String name = storeResponse.getResponseHeaderNames()[idx]; - String value = storeResponse.getResponseHeaderValues()[idx]; - - headers.put(name, value); - } - - this.updateResponseHeader(request, headers); - this.captureSessionToken(request, headers); - BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); - RxDocumentServiceResponse rxDocumentServiceResponse = - new RxDocumentServiceResponse(this.diagnosticsClientContext, storeResponse); - rxDocumentServiceResponse.setCosmosDiagnostics(request.requestContext.cosmosDiagnostics); - - return rxDocumentServiceResponse; - } - - private long getLSN(Map headers) { - long defaultValue = -1; - String value = headers.get(WFConstants.BackendHeaders.LSN); - - if (!Strings.isNullOrEmpty(value)) { - return NumberUtils.toLong(value, defaultValue); - - } - - return defaultValue; - } - - private void updateResponseHeader(RxDocumentServiceRequest request, Map headers) { - String requestConsistencyLevel = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); - - boolean sessionConsistency = - this.serviceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION || - (!Strings.isNullOrEmpty(requestConsistencyLevel) - && Strings.areEqualIgnoreCase(requestConsistencyLevel, ConsistencyLevel.SESSION.toString())); - - long storeLSN = this.getLSN(headers); - if (storeLSN == -1) { - return; - } - - String partitionKeyRangeId = headers.get(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); - - if (Strings.isNullOrEmpty(partitionKeyRangeId)) { - String inputSession = request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(inputSession) - && inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR) >= 1) { - partitionKeyRangeId = inputSession.substring(0, - inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR)); - } else { - partitionKeyRangeId = ZERO_PARTITION_KEY_RANGE; - } - } - - ISessionToken sessionToken = null; - String sessionTokenResponseHeader = headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(sessionTokenResponseHeader)) { - sessionToken = SessionTokenHelper.parse(sessionTokenResponseHeader); - } - - if (sessionToken != null) { - headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, - SessionTokenHelper.concatPartitionKeyRangeIdWithSessionToken(partitionKeyRangeId, sessionToken.convertToString())); - } - - headers.remove(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); - } - - private void captureSessionToken(RxDocumentServiceRequest request, Map headers) { - if (request.getResourceType() == ResourceType.DocumentCollection - && request.getOperationType() == OperationType.Delete) { - String resourceId; - if (request.getIsNameBased()) { - resourceId = headers.get(HttpConstants.HttpHeaders.OWNER_ID); - } else { - resourceId = request.getResourceId(); - } - this.sessionContainer.clearTokenByResourceId(resourceId); - } else { - this.sessionContainer.setSessionToken(request, headers); - } - } - - // TODO RNTBD support - // https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/262496 -} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java new file mode 100644 index 000000000000..975ae4d4ca77 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java @@ -0,0 +1,17 @@ +package com.azure.cosmos.implementation.http; + +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import io.netty.buffer.ByteBuf; + +import java.net.URI; + +public interface HttpTransportSerializer { + HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception; + + StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content); +} From 2e0672d7a151c9632188d416dc2f484645db3171 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 21 Dec 2024 18:55:30 -0500 Subject: [PATCH 5/9] Adding serialization skeleton in ThinClientStoreModel --- .../implementation/RxGatewayStoreModel.java | 6 +- .../implementation/ThinClientStoreModel.java | 64 +++++++++++++++++-- .../rntbd/RntbdRequest.java | 2 +- .../rntbd/RntbdRequestArgs.java | 4 ++ 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 31d8271276d5..85ba4be9f501 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -275,9 +275,9 @@ public Mono performRequestInternal(RxDocumentServiceR try { + // todo: neharao1 - see if the below three statements can be removed since these are part of wrapInHttpRequest HttpMethod method = getHttpMethod(request); HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - Flux contentAsByteArray = request.getContentAsByteArrayFlux(); HttpRequest httpRequest = request @@ -654,6 +654,10 @@ public void recordOpenConnectionsAndInitCachesStarted(List getDefaultHeaders() { + return this.defaultHeaders; + } + private void captureSessionToken(RxDocumentServiceRequest request, Map responseHeaders) { if (request.getResourceType() == ResourceType.DocumentCollection && request.getOperationType() == OperationType.Delete) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index ad0bf54a5411..7112884b5f9c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -3,23 +3,26 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs; import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; -import com.azure.cosmos.models.CosmosContainerIdentity; +import com.azure.cosmos.implementation.http.HttpHeaders; +import com.azure.cosmos.implementation.http.HttpRequest; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpMethod; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.util.HashMap; -import java.util.List; import java.util.Map; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. + * This is meant to be internally used only by our sdk. * * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. */ @@ -76,4 +79,53 @@ protected Map getDefaultHeaders( return defaultHeaders; } + + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + + // todo - neharao1 - validate b/w name() v/s toString() + request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name()); + + // todo - neharao1: no concept of a replica / service endpoint that can be passed + RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request); + + // todo - neharao1: validate what HTTP headers are needed - for now have put default ThinClient HTTP headers + // todo - based on fabianm comment - thinClient also takes op type and resource type headers as HTTP headers + HttpHeaders headers = this.getHttpHeaders(); + + RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs); + + // todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer + ByteBuf byteBuf = Unpooled.buffer(); + + // todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept) + // todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format) + // todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy + rntbdRequest.encode(byteBuf); + + return new HttpRequest( + // todo: HttpMethod when using ThinClient is presumably always an HttpMethod.POST - validate this + HttpMethod.POST, + requestUri, + requestUri.getPort(), + headers, + Flux.just(byteBuf.array())); + } + + // todo: neharao1 - validate if RxGatewayStoreModel#unwrapToStoreResponse can be reused +// @Override +// public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) { +// return null; +// } + + private HttpHeaders getHttpHeaders() { + HttpHeaders httpHeaders = new HttpHeaders(); + Map defaultHeaders = this.getDefaultHeaders(); + + for (Map.Entry header : defaultHeaders.entrySet()) { + httpHeaders.set(header.getKey(), header.getValue()); + } + + return httpHeaders; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java index daaa93ea82fc..ef01f5d15106 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java @@ -76,7 +76,7 @@ public static RntbdRequest decode(final ByteBuf in) { return new RntbdRequest(header, metadata, payload); } - void encode(final ByteBuf out) { + public void encode(final ByteBuf out) { final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength(); final int start = out.writerIndex(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java index 32869a5f6460..0112debe344c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java @@ -56,6 +56,10 @@ public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest, final Uri this.transportRequestId = instanceCount.incrementAndGet(); } + public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest) { + this(serviceRequest, null); + } + // region Accessors @JsonProperty From e5dc25b16173c2c48deb7ae78ac058944bbbdbea Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 6 Jan 2025 13:41:02 -0800 Subject: [PATCH 6/9] test --- .../ThinClientStoreModelTest.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java new file mode 100644 index 000000000000..7e8950d5a5b9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -0,0 +1,63 @@ +package com.azure.cosmos.implementation; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker; +import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpRequest; +import io.netty.channel.ConnectTimeoutException; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; + +public class ThinClientStoreModelTest { + @Test(groups = "unit") + public void testThinClientStoreModel() throws Exception { + DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); + Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig(); + Mockito + .doReturn(ImplementationBridgeHelpers + .CosmosDiagnosticsHelper + .getCosmosDiagnosticsAccessor() + .create(clientContext, 1d)) + .when(clientContext).createDiagnostics(); + + String sdkGlobalSessionToken = "1#100#1=20#2=5#3=30"; + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + Mockito.doReturn(sdkGlobalSessionToken).when(sessionContainer).resolveGlobalSessionToken(any()); + + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class); + + Mockito.doReturn(new URI("https://localhost")) + .when(globalEndpointManager).resolveServiceEndpoint(any()); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException())); + + ThinClientStoreModel storeModel = new ThinClientStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + new UserAgentContainer(), + globalEndpointManager, + httpClient); + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Read, + "/fakeResourceFullName", + ResourceType.Document); + + try { + storeModel.performRequest(dsr).block(); + } catch (Exception e) { + //no-op + } + } +} From 4f0dba88c69127695d4f49252e82632be746170f Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Sun, 12 Jan 2025 22:58:26 -0800 Subject: [PATCH 7/9] fix --- .../implementation/ThinClientStoreModelTest.java | 8 +++++++- .../cosmos/implementation/RxGatewayStoreModel.java | 6 ------ .../cosmos/implementation/ThinClientStoreModel.java | 11 ----------- .../directconnectivity/rntbd/RntbdRequestArgs.java | 2 +- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java index 7e8950d5a5b9..772be371b0c8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -2,8 +2,13 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.implementation.http.Http2ConnectionConfig; import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpClientConfig; +import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; +import com.azure.cosmos.implementation.http.ReactorNettyClient; import io.netty.channel.ConnectTimeoutException; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -32,11 +37,12 @@ public void testThinClientStoreModel() throws Exception { Mockito.doReturn(sdkGlobalSessionToken).when(sessionContainer).resolveGlobalSessionToken(any()); GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); - GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class); Mockito.doReturn(new URI("https://localhost")) .when(globalEndpointManager).resolveServiceEndpoint(any()); + // mocking with HTTP/1.1 client, just using this test as basic store model validation. e2e request flow + // with HTTP/2 will be tested in future PR once the wiring is all connected HttpClient httpClient = Mockito.mock(HttpClient.class); Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException())); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 85ba4be9f501..0ef03f72a061 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -274,12 +274,6 @@ public Mono performRequest(RxDocumentServiceRequest r public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { try { - - // todo: neharao1 - see if the below three statements can be removed since these are part of wrapInHttpRequest - HttpMethod method = getHttpMethod(request); - HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - Flux contentAsByteArray = request.getContentAsByteArrayFlux(); - HttpRequest httpRequest = request .getEffectiveHttpTransportSerializer(this) .wrapInHttpRequest(request, requestUri); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 7112884b5f9c..137b25a0edf5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -52,11 +52,6 @@ public ThinClientStoreModel(ThinClientStoreModel inner) { @Override public Mono processMessage(RxDocumentServiceRequest request) { - // direct/gateway mode validations? session token, bad consistency level header - - // TODO @nehrao/@fabianm FIX BEFORE CHECK-IN - // conditionally set RntbdTransportSerializer and physicalAddress here - // RntbdHttpTransportSerializer would need to create rntbdRequestArgs, then RntbdRequest from it and call encode return super.processMessage(request); } @@ -112,12 +107,6 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque Flux.just(byteBuf.array())); } - // todo: neharao1 - validate if RxGatewayStoreModel#unwrapToStoreResponse can be reused -// @Override -// public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) { -// return null; -// } - private HttpHeaders getHttpHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); Map defaultHeaders = this.getDefaultHeaders(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java index 0112debe344c..e8d9069d1ed2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java @@ -57,7 +57,7 @@ public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest, final Uri } public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest) { - this(serviceRequest, null); + this(serviceRequest, Uri.create("")); } // region Accessors From e9adb39a548b0191997df2d82f81c51e60cc5999 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Tue, 14 Jan 2025 20:15:42 -0800 Subject: [PATCH 8/9] todo --- .../src/test/java/com/azure/cosmos/CosmosItemTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java index 1cc8c9676115..bbc7e26a3104 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java @@ -104,12 +104,6 @@ public void afterClass() { @Test(groups = { "fast" }, timeOut = TIMEOUT) public void createItem() throws Exception { - // TODO @nehrao/@fabianm REMOVE BEFORE CHECK-IN - if (client.asyncClient().getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT || - client.asyncClient().getEffectiveConsistencyLevel(OperationType.Read, null) != ConsistencyLevel.STRONG) { - - throw new SkipException("Disabled for debugging"); - } InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString()); CosmosItemResponse itemResponse = container.createItem(properties); assertThat(itemResponse.getRequestCharge()).isGreaterThan(0); From 04f9508e10728f96b3199042266f6d69d2c3566b Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 20 Jan 2025 00:11:18 -0800 Subject: [PATCH 9/9] pr comments --- .../azure/cosmos/implementation/ThinClientStoreModel.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 137b25a0edf5..747faa9effb5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -66,7 +66,7 @@ protected Map getDefaultHeaders( Map defaultHeaders = new HashMap<>(); // For ThinClient http/2 used for framing only // All operation-level headers are only added to the rntbd-encoded message - // the thin client proxy wil parse the rntbd headers (not the content!) and substitute any + // the thin client proxy will parse the rntbd headers (not the content!) and substitute any // missing headers for routing (like partitionId or replicaId) // Since the Thin client proxy also needs to set the user-agent header to a different value // it is not added to the rntbd headers - just http-headers in the SDK @@ -91,15 +91,16 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs); // todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer + // todo: eventually need to use pooled buffer ByteBuf byteBuf = Unpooled.buffer(); // todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept) // todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format) // todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy + // todo: need to conditionally add some headers (userAgent, replicaId/endpoint, etc) rntbdRequest.encode(byteBuf); return new HttpRequest( - // todo: HttpMethod when using ThinClient is presumably always an HttpMethod.POST - validate this HttpMethod.POST, requestUri, requestUri.getPort(), @@ -109,12 +110,14 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque private HttpHeaders getHttpHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); + // todo: select only required headers from defaults Map defaultHeaders = this.getDefaultHeaders(); for (Map.Entry header : defaultHeaders.entrySet()) { httpHeaders.set(header.getKey(), header.getValue()); } + // todo: add thin client resourcetype/operationtype headers return httpHeaders; } }