From a0b37cc3138161b8c05f135dea84cb44c0022ce1 Mon Sep 17 00:00:00 2001 From: Adi Suissa-Peleg Date: Thu, 22 Sep 2022 19:40:17 +0000 Subject: [PATCH] delta-xds: perform resource state tracking after resource ingestion Signed-off-by: Adi Suissa-Peleg --- changelogs/current.yaml | 5 ++ .../config/new_delta_subscription_state.cc | 23 +++++++++- .../xds_mux/delta_subscription_state.cc | 22 ++++++++- source/common/runtime/runtime_features.cc | 1 + .../config/delta_subscription_state_test.cc | 46 ++++++++++++++++++- 5 files changed, 93 insertions(+), 4 deletions(-) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index d92295c78a99..41bdceb3af27 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -35,6 +35,11 @@ behavior_changes: limit the duration of the upstream connections (the default value is 1h, and the recommended value is 5min). This behavior change can be reverted by setting runtime guard ``envoy.reloadable_features.original_dst_rely_on_idle_timeout``. +- area: config + change: | + Fixed resource tracking when using the Incremental (Delta-xDS) protocol. The protocol state will be updated after + the resources are successfully ingested and an ACK is sent. This behavior change can be reverted by setting the + ``envoy.reloadable_features.delta_xds_subscription_state_tracking_fix`` runtime flag to false. minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* diff --git a/source/common/config/new_delta_subscription_state.cc b/source/common/config/new_delta_subscription_state.cc index 09ab88b809aa..b87a0c32227d 100644 --- a/source/common/config/new_delta_subscription_state.cc +++ b/source/common/config/new_delta_subscription_state.cc @@ -211,7 +211,8 @@ void NewDeltaSubscriptionState::handleGoodResponse( } } - { + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { const auto scoped_update = ttl_.scopedTtlUpdate(); if (requested_resource_state_.contains(Wildcard)) { for (const auto& resource : message.resources()) { @@ -231,6 +232,26 @@ void NewDeltaSubscriptionState::handleGoodResponse( watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(), message.system_version_info()); + if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { + // The resources were successfully update internally, update the resource + // tracking state. + const auto scoped_update = ttl_.scopedTtlUpdate(); + if (requested_resource_state_.contains(Wildcard)) { + for (const auto& resource : message.resources()) { + addResourceStateFromServer(resource); + } + } else { + // We are not subscribed to wildcard, so we only take resources that we explicitly requested + // and ignore the others. + for (const auto& resource : message.resources()) { + if (requested_resource_state_.contains(resource.name())) { + addResourceStateFromServer(resource); + } + } + } + } + // If a resource is gone, there is no longer a meaningful version for it that makes sense to // provide to the server upon stream reconnect: either it will continue to not exist, in which // case saying nothing is fine, or the server will bring back something new, which we should diff --git a/source/common/config/xds_mux/delta_subscription_state.cc b/source/common/config/xds_mux/delta_subscription_state.cc index 0b44f0b55417..70693ca63c62 100644 --- a/source/common/config/xds_mux/delta_subscription_state.cc +++ b/source/common/config/xds_mux/delta_subscription_state.cc @@ -182,7 +182,8 @@ void DeltaSubscriptionState::handleGoodResponse( } } - { + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { const auto scoped_update = ttl_.scopedTtlUpdate(); if (requested_resource_state_.contains(Wildcard)) { for (const auto& resource : message.resources()) { @@ -203,6 +204,25 @@ void DeltaSubscriptionState::handleGoodResponse( callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(), message.system_version_info()); + if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { + const auto scoped_update = ttl_.scopedTtlUpdate(); + if (requested_resource_state_.contains(Wildcard)) { + for (const auto& resource : message.resources()) { + addResourceStateFromServer(resource); + } + } else { + // We are not subscribed to wildcard, so we only take resources that we explicitly requested + // and ignore the others. + // NOTE: This is not gonna work for xdstp resources with glob resource matching. + for (const auto& resource : message.resources()) { + if (requested_resource_state_.contains(resource.name())) { + addResourceStateFromServer(resource); + } + } + } + } + // If a resource is gone, there is no longer a meaningful version for it that makes sense to // provide to the server upon stream reconnect: either it will continue to not exist, in which // case saying nothing is fine, or the server will bring back something new, which we should diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index ddea8ff3f4d9..8179f813d626 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -42,6 +42,7 @@ RUNTIME_GUARD(envoy_reloadable_features_combine_sds_requests); RUNTIME_GUARD(envoy_reloadable_features_conn_pool_delete_when_idle); RUNTIME_GUARD(envoy_reloadable_features_conn_pool_new_stream_with_early_data_and_http3); RUNTIME_GUARD(envoy_reloadable_features_correct_remote_address); +RUNTIME_GUARD(envoy_reloadable_features_delta_xds_subscription_state_tracking_fix); RUNTIME_GUARD(envoy_reloadable_features_deprecate_global_ints); RUNTIME_GUARD(envoy_reloadable_features_do_not_await_headers_on_upstream_timeout_to_emit_stats); RUNTIME_GUARD(envoy_reloadable_features_do_not_count_mapped_pages_as_free); diff --git a/test/common/config/delta_subscription_state_test.cc b/test/common/config/delta_subscription_state_test.cc index 79476abc8a76..6bf70e3b211b 100644 --- a/test/common/config/delta_subscription_state_test.cc +++ b/test/common/config/delta_subscription_state_test.cc @@ -681,7 +681,10 @@ TEST_P(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1C"}, {"name2", "version2C"}, {"name3", "version3B"}}); - EXPECT_CALL(*ttl_timer_, disableTimer()); + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { + EXPECT_CALL(*ttl_timer_, disableTimer()); + } UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug3", "nonce3", "oh no"); EXPECT_EQ("nonce3", ack.nonce_); EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); @@ -702,7 +705,10 @@ TEST_P(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1D"}, {"name2", "version2D"}, {"name3", "version3D"}}); - EXPECT_CALL(*ttl_timer_, disableTimer()); + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { + EXPECT_CALL(*ttl_timer_, disableTimer()); + } UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug5", "nonce5", very_large_error_message); EXPECT_EQ("nonce5", ack.nonce_); @@ -1227,6 +1233,42 @@ TEST_P(DeltaSubscriptionStateTest, TypeUrlMismatch) { handleResponse(message); } +// Verifies that an update that is NACKed doesn't update the tracked +// versions of the registered resources. +TEST_P(DeltaSubscriptionStateTest, NoVersionUpdateOnNack) { + // The xDS server's first response includes items for name1 and 2, but not 3. + { + Protobuf::RepeatedPtrField added_resources = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + EXPECT_CALL(*ttl_timer_, disableTimer()); + UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug1", "nonce1"); + EXPECT_EQ("nonce1", ack.nonce_); + EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } + // The next response tries but fails to update the 2 and add name3, and so should produce a NACK. + { + Protobuf::RepeatedPtrField added_resources = + populateRepeatedResource( + {{"name1", "version1B"}, {"name2", "version2B"}, {"name3", "version3A"}}); + if (!Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { + EXPECT_CALL(*ttl_timer_, disableTimer()); + } + UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug2", "nonce2", "oh no"); + EXPECT_EQ("nonce2", ack.nonce_); + EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } + // Verify that a reconnect keeps the old versions. + markStreamFresh(); + { + auto req = getNextRequestAckless(); + EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("name1", "name2", "name3")); + EXPECT_TRUE(req->resource_names_unsubscribe().empty()); + EXPECT_THAT(req->initial_resource_versions(), + UnorderedElementsAre(Pair("name1", "version1A"), Pair("name2", "version2A"))); + } +} + class VhdsDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestWithResources { public: VhdsDeltaSubscriptionStateTest()