Skip to content

Commit

Permalink
Revert "delta-xds: perform resource state tracking after resource ing… (
Browse files Browse the repository at this point in the history
#23335)

Revert "delta-xds: perform resource state tracking after resource ingestion (#23214)"

This reverts commit 6e6a3a1.

Coverage is failing on main.

Signed-off-by: Matt Klein <[email protected]>
  • Loading branch information
mattklein123 authored Oct 1, 2022
1 parent ff98f8e commit f09ed36
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 93 deletions.
5 changes: 0 additions & 5 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ behavior_changes:
limit the duration of the upstream connections (the default value is 1h, and the recommended value is 5min). This
behavior change can be reverted by setting runtime guard
``envoy.reloadable_features.original_dst_rely_on_idle_timeout``.
- area: config
change: |
Fixed resource tracking when using the Incremental (Delta-xDS) protocol. The protocol state will be updated after
the resources are successfully ingested and an ACK is sent. This behavior change can be reverted by setting the
``envoy.reloadable_features.delta_xds_subscription_state_tracking_fix`` runtime flag to false.
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
23 changes: 1 addition & 22 deletions source/common/config/new_delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ void NewDeltaSubscriptionState::handleGoodResponse(
}
}

if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
{
const auto scoped_update = ttl_.scopedTtlUpdate();
if (requested_resource_state_.contains(Wildcard)) {
for (const auto& resource : message.resources()) {
Expand All @@ -232,26 +231,6 @@ void NewDeltaSubscriptionState::handleGoodResponse(
watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());

if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
// The resources were successfully update internally, update the resource
// tracking state.
const auto scoped_update = ttl_.scopedTtlUpdate();
if (requested_resource_state_.contains(Wildcard)) {
for (const auto& resource : message.resources()) {
addResourceStateFromServer(resource);
}
} else {
// We are not subscribed to wildcard, so we only take resources that we explicitly requested
// and ignore the others.
for (const auto& resource : message.resources()) {
if (requested_resource_state_.contains(resource.name())) {
addResourceStateFromServer(resource);
}
}
}
}

// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
Expand Down
22 changes: 1 addition & 21 deletions source/common/config/xds_mux/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ void DeltaSubscriptionState::handleGoodResponse(
}
}

if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
{
const auto scoped_update = ttl_.scopedTtlUpdate();
if (requested_resource_state_.contains(Wildcard)) {
for (const auto& resource : message.resources()) {
Expand All @@ -204,25 +203,6 @@ void DeltaSubscriptionState::handleGoodResponse(
callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());

if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
const auto scoped_update = ttl_.scopedTtlUpdate();
if (requested_resource_state_.contains(Wildcard)) {
for (const auto& resource : message.resources()) {
addResourceStateFromServer(resource);
}
} else {
// We are not subscribed to wildcard, so we only take resources that we explicitly requested
// and ignore the others.
// NOTE: This is not gonna work for xdstp resources with glob resource matching.
for (const auto& resource : message.resources()) {
if (requested_resource_state_.contains(resource.name())) {
addResourceStateFromServer(resource);
}
}
}
}

// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
Expand Down
1 change: 0 additions & 1 deletion source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ RUNTIME_GUARD(envoy_reloadable_features_combine_sds_requests);
RUNTIME_GUARD(envoy_reloadable_features_conn_pool_delete_when_idle);
RUNTIME_GUARD(envoy_reloadable_features_conn_pool_new_stream_with_early_data_and_http3);
RUNTIME_GUARD(envoy_reloadable_features_correct_remote_address);
RUNTIME_GUARD(envoy_reloadable_features_delta_xds_subscription_state_tracking_fix);
RUNTIME_GUARD(envoy_reloadable_features_deprecate_global_ints);
RUNTIME_GUARD(envoy_reloadable_features_do_not_await_headers_on_upstream_timeout_to_emit_stats);
RUNTIME_GUARD(envoy_reloadable_features_do_not_count_mapped_pages_as_free);
Expand Down
46 changes: 2 additions & 44 deletions test/common/config/delta_subscription_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -681,10 +681,7 @@ TEST_P(DeltaSubscriptionStateTest, AckGenerated) {
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource(
{{"name1", "version1C"}, {"name2", "version2C"}, {"name3", "version3B"}});
if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
EXPECT_CALL(*ttl_timer_, disableTimer());
}
EXPECT_CALL(*ttl_timer_, disableTimer());
UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug3", "nonce3", "oh no");
EXPECT_EQ("nonce3", ack.nonce_);
EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code());
Expand All @@ -705,10 +702,7 @@ TEST_P(DeltaSubscriptionStateTest, AckGenerated) {
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource(
{{"name1", "version1D"}, {"name2", "version2D"}, {"name3", "version3D"}});
if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
EXPECT_CALL(*ttl_timer_, disableTimer());
}
EXPECT_CALL(*ttl_timer_, disableTimer());
UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug5", "nonce5",
very_large_error_message);
EXPECT_EQ("nonce5", ack.nonce_);
Expand Down Expand Up @@ -1233,42 +1227,6 @@ TEST_P(DeltaSubscriptionStateTest, TypeUrlMismatch) {
handleResponse(message);
}

// Verifies that an update that is NACKed doesn't update the tracked
// versions of the registered resources.
TEST_P(DeltaSubscriptionStateTest, NoVersionUpdateOnNack) {
// The xDS server's first response includes items for name1 and 2, but not 3.
{
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}});
EXPECT_CALL(*ttl_timer_, disableTimer());
UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug1", "nonce1");
EXPECT_EQ("nonce1", ack.nonce_);
EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code());
}
// The next response tries but fails to update the 2 and add name3, and so should produce a NACK.
{
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource(
{{"name1", "version1B"}, {"name2", "version2B"}, {"name3", "version3A"}});
if (!Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) {
EXPECT_CALL(*ttl_timer_, disableTimer());
}
UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug2", "nonce2", "oh no");
EXPECT_EQ("nonce2", ack.nonce_);
EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code());
}
// Verify that a reconnect keeps the old versions.
markStreamFresh();
{
auto req = getNextRequestAckless();
EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("name1", "name2", "name3"));
EXPECT_TRUE(req->resource_names_unsubscribe().empty());
EXPECT_THAT(req->initial_resource_versions(),
UnorderedElementsAre(Pair("name1", "version1A"), Pair("name2", "version2A")));
}
}

class VhdsDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestWithResources {
public:
VhdsDeltaSubscriptionStateTest()
Expand Down

0 comments on commit f09ed36

Please sign in to comment.