diff --git a/docs/generated/kuma-cp.md b/docs/generated/kuma-cp.md index ef3cdcbe2e55..6dfa0625e2d1 100644 --- a/docs/generated/kuma-cp.md +++ b/docs/generated/kuma-cp.md @@ -496,6 +496,9 @@ multizone: nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF # DisableSOTW if true doesn't expose SOTW version of KDS. Default: false disableSOTW: false # ENV: KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW + # Response backoff is a time Global CP waits before sending ACK/NACK. + # This is a way to slow down Zone CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -516,6 +519,9 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # Response backoff is a time Zone CP waits before sending ACK/NACK. + # This is a way to slow down Global CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_ZONE_KDS_RESPONSE_BACKOFF # Diagnostics configuration diagnostics: diff --git a/docs/generated/raw/kuma-cp.yaml b/docs/generated/raw/kuma-cp.yaml index b418c33d4256..29257f174c29 100644 --- a/docs/generated/raw/kuma-cp.yaml +++ b/docs/generated/raw/kuma-cp.yaml @@ -493,6 +493,9 @@ multizone: nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF # DisableSOTW if true doesn't expose SOTW version of KDS. Default: false disableSOTW: false # ENV: KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW + # Response backoff is a time Global CP waits before sending ACK/NACK. + # This is a way to slow down Zone CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -513,6 +516,9 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # Response backoff is a time Zone CP waits before sending ACK/NACK. + # This is a way to slow down Global CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_ZONE_KDS_RESPONSE_BACKOFF # Diagnostics configuration diagnostics: diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index b418c33d4256..29257f174c29 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -493,6 +493,9 @@ multizone: nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF # DisableSOTW if true doesn't expose SOTW version of KDS. Default: false disableSOTW: false # ENV: KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW + # Response backoff is a time Global CP waits before sending ACK/NACK. + # This is a way to slow down Zone CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -513,6 +516,9 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # Response backoff is a time Zone CP waits before sending ACK/NACK. + # This is a way to slow down Global CP from sending resources too often. + responseBackoff: 0s # ENV: KUMA_MULTIZONE_ZONE_KDS_RESPONSE_BACKOFF # Diagnostics configuration diagnostics: diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 959da25e8ab6..65d1d7fb358e 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -252,6 +252,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.MsgSendTimeout.Duration).To(Equal(10 * time.Second)) Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue()) + Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second)) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -259,6 +260,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Zone.KDS.MaxMsgSize).To(Equal(uint32(2))) Expect(cfg.Multizone.Zone.KDS.MsgSendTimeout.Duration).To(Equal(20 * time.Second)) Expect(cfg.Multizone.Zone.KDS.NackBackoff.Duration).To(Equal(21 * time.Second)) + Expect(cfg.Multizone.Zone.KDS.ResponseBackoff.Duration).To(Equal(2 * time.Second)) Expect(cfg.Multizone.Zone.KDS.TlsSkipVerify).To(BeTrue()) Expect(cfg.Defaults.SkipMeshCreation).To(BeTrue()) @@ -564,6 +566,7 @@ multizone: maxMsgSize: 1 msgSendTimeout: 10s nackBackoff: 11s + responseBackoff: 1s disableSOTW: true zone: globalAddress: "grpc://1.1.1.1:5685" @@ -574,6 +577,7 @@ multizone: maxMsgSize: 2 msgSendTimeout: 20s nackBackoff: 21s + responseBackoff: 2s tlsSkipVerify: true dnsServer: domain: test-domain @@ -861,6 +865,7 @@ tracing: "KUMA_MULTIZONE_GLOBAL_KDS_MAX_MSG_SIZE": "1", "KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT": "10s", "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", + "KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s", "KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", @@ -869,6 +874,7 @@ tracing: "KUMA_MULTIZONE_ZONE_KDS_MAX_MSG_SIZE": "2", "KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT": "20s", "KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF": "21s", + "KUMA_MULTIZONE_ZONE_KDS_RESPONSE_BACKOFF": "2s", "KUMA_MULTIZONE_ZONE_KDS_TLS_SKIP_VERIFY": "true", "KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED": "true", "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_INSIGHT_FLUSH_INTERVAL": "5s", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index ad8f2d769c7a..ec39e823bb2b 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -37,6 +37,9 @@ type KdsServerConfig struct { NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_global_kds_nack_backoff"` // DisableSOTW if true doesn't expose SOTW version of KDS. Default: false DisableSOTW bool `json:"disableSOTW" envconfig:"kuma_multizone_global_kds_disable_sotw"` + // ResponseBackoff is a time Global CP waits before sending ACK/NACK. + // This is a way to slow down Zone CP from sending resources too often. + ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_global_kds_response_backoff"` } var _ config.Config = &KdsServerConfig{} @@ -88,6 +91,9 @@ type KdsClientConfig struct { MsgSendTimeout config_types.Duration `json:"msgSendTimeout" envconfig:"kuma_multizone_zone_kds_msg_send_timeout"` // Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane. NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_zone_kds_nack_backoff"` + // ResponseBackoff is a time Zone CP waits before sending ACK/NACK. + // This is a way to slow down Global CP from sending resources too often. + ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_zone_kds_response_backoff"` } var _ config.Config = &KdsClientConfig{} diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index c82a25cfcd1c..a659163f15fa 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -140,8 +140,13 @@ func Setup(rt runtime.Runtime) error { log := kdsDeltaGlobalLog.WithValues("peer-id", clientId) log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions()) kdsStream := kds_client_v2.NewDeltaKDSStream(stream, clientId, "") - sink := kds_client_v2.NewKDSSyncClient(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByGlobal)), kdsStream, - kds_sync_store_v2.GlobalSyncCallback(stream.Context(), resourceSyncerV2, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace)) + sink := kds_client_v2.NewKDSSyncClient( + log, + reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByGlobal)), + kdsStream, + kds_sync_store_v2.GlobalSyncCallback(stream.Context(), resourceSyncerV2, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace), + rt.Config().Multizone.Global.KDS.ResponseBackoff.Duration, + ) go func() { if err := sink.Receive(); err != nil { errChan <- errors.Wrap(err, "KDSSyncClient finished with an error") diff --git a/pkg/kds/v2/client/kds_client.go b/pkg/kds/v2/client/kds_client.go index 3e0db4b20a07..22bb82cdd4a7 100644 --- a/pkg/kds/v2/client/kds_client.go +++ b/pkg/kds/v2/client/kds_client.go @@ -2,6 +2,7 @@ package client import ( "io" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -35,18 +36,26 @@ type KDSSyncClient interface { } type kdsSyncClient struct { - log logr.Logger - resourceTypes []core_model.ResourceType - callbacks *Callbacks - kdsStream DeltaKDSStream + log logr.Logger + resourceTypes []core_model.ResourceType + callbacks *Callbacks + kdsStream DeltaKDSStream + responseBackoff time.Duration } -func NewKDSSyncClient(log logr.Logger, rt []core_model.ResourceType, kdsStream DeltaKDSStream, cb *Callbacks) KDSSyncClient { +func NewKDSSyncClient( + log logr.Logger, + rt []core_model.ResourceType, + kdsStream DeltaKDSStream, + cb *Callbacks, + responseBackoff time.Duration, +) KDSSyncClient { return &kdsSyncClient{ - log: log, - resourceTypes: rt, - kdsStream: kdsStream, - callbacks: cb, + log: log, + resourceTypes: rt, + kdsStream: kdsStream, + callbacks: cb, + responseBackoff: responseBackoff, } } @@ -78,7 +87,13 @@ func (s *kdsSyncClient) Receive() error { } continue } - if err := s.callbacks.OnResourcesReceived(received); err != nil { + err = s.callbacks.OnResourcesReceived(received) + if !received.IsInitialRequest { + // Execute backoff only on subsequent request. + // When client first connects, the server sends empty DeltaDiscoveryResponse for every resource type. + time.Sleep(s.responseBackoff) + } + if err != nil { s.log.Info("error during callback received, sending NACK", "err", err) if err := s.kdsStream.NACK(received.Type, err); err != nil { if err == io.EOF { diff --git a/pkg/kds/v2/client/zone_sync_test.go b/pkg/kds/v2/client/zone_sync_test.go index fbcfa6e920bd..35f64621e07c 100644 --- a/pkg/kds/v2/client/zone_sync_test.go +++ b/pkg/kds/v2/client/zone_sync_test.go @@ -37,7 +37,7 @@ var _ = Describe("Zone Delta Sync", func() { core.Log.WithName("kds-sink"), registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), client_v2.NewDeltaKDSStream(cs, zoneName, ""), - sync_store_v2.ZoneSyncCallback(context.Background(), configs, resourceSyncer, false, zoneName, nil, "kuma-system"), + sync_store_v2.ZoneSyncCallback(context.Background(), configs, resourceSyncer, false, zoneName, nil, "kuma-system"), 0, ) } ingressFunc := func(zone string) *mesh_proto.ZoneIngress { diff --git a/pkg/kds/zone/components.go b/pkg/kds/zone/components.go index 71792d49a749..ab9c303515d4 100644 --- a/pkg/kds/zone/components.go +++ b/pkg/kds/zone/components.go @@ -118,7 +118,10 @@ func Setup(rt core_runtime.Runtime) error { onGlobalToZoneSyncStarted := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncClient, errChan chan error) { log := kdsDeltaZoneLog.WithValues("kds-version", "v2") - syncClient := kds_client_v2.NewKDSSyncClient(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client_v2.NewDeltaKDSStream(stream, zone, string(cfgJson)), + syncClient := kds_client_v2.NewKDSSyncClient( + log, + reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), + kds_client_v2.NewDeltaKDSStream(stream, zone, string(cfgJson)), kds_sync_store_v2.ZoneSyncCallback( stream.Context(), rt.KDSContext().Configs, @@ -128,6 +131,7 @@ func Setup(rt core_runtime.Runtime) error { kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace, ), + rt.Config().Multizone.Zone.KDS.ResponseBackoff.Duration, ) go func() { if err := syncClient.Receive(); err != nil { diff --git a/pkg/kds/zone/components_test.go b/pkg/kds/zone/components_test.go index 9253c219701c..530345506dfe 100644 --- a/pkg/kds/zone/components_test.go +++ b/pkg/kds/zone/components_test.go @@ -238,6 +238,7 @@ var _ = Describe("Zone Sync", func() { registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client_v2.NewDeltaKDSStream(cs, zoneName, ""), sync_store_v2.ZoneSyncCallback(context.Background(), configs, resourceSyncer, false, zoneName, nil, "kuma-system"), + 0, ) } diff --git a/pkg/test/kds/setup/client.go b/pkg/test/kds/setup/client.go index 4c6a272fe2ae..b9c778b6475b 100644 --- a/pkg/test/kds/setup/client.go +++ b/pkg/test/kds/setup/client.go @@ -26,7 +26,7 @@ func StartDeltaClient(clientStreams []*grpc.MockDeltaClientStream, resourceTypes for i := 0; i < len(clientStreams); i++ { clientID := fmt.Sprintf("client-%d", i) item := clientStreams[i] - comp := kds_client_v2.NewKDSSyncClient(core.Log.WithName("kds").WithName(clientID), resourceTypes, kds_client_v2.NewDeltaKDSStream(item, clientID, ""), cb) + comp := kds_client_v2.NewKDSSyncClient(core.Log.WithName("kds").WithName(clientID), resourceTypes, kds_client_v2.NewDeltaKDSStream(item, clientID, ""), cb, 0) go func() { _ = comp.Receive() _ = item.CloseSend()