From d70e07aac57201917d9ddef041738dc52b7eaf1a Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Tue, 5 Sep 2023 15:30:04 +0200 Subject: [PATCH 1/7] feat(kuma-cp): pass resource keys to postgres query for delta kds Signed-off-by: Lukasz Dziedziak --- pkg/core/resources/store/options.go | 7 +++++ pkg/core/resources/store/pagination_store.go | 12 +++++++- pkg/kds/v2/store/sync.go | 19 ++++++++++-- pkg/plugins/resources/postgres/pgx_store.go | 31 ++++++++++++++------ pkg/plugins/resources/postgres/pq_store.go | 31 ++++++++++++++------ 5 files changed, 79 insertions(+), 21 deletions(-) diff --git a/pkg/core/resources/store/options.go b/pkg/core/resources/store/options.go index dd7a5b1a16b1..ada5787c48b3 100644 --- a/pkg/core/resources/store/options.go +++ b/pkg/core/resources/store/options.go @@ -161,6 +161,7 @@ type ListOptions struct { FilterFunc ListFilterFunc NameContains string Ordered bool + ResourceKeys []core_model.ResourceKey } type ListOptionsFunc func(*ListOptions) @@ -213,6 +214,12 @@ func ListOrdered() ListOptionsFunc { } } +func ListByResourceKeys(rk []core_model.ResourceKey) ListOptionsFunc { + return func(opts *ListOptions) { + opts.ResourceKeys = rk + } +} + func (l *ListOptions) IsCacheable() bool { return l.FilterFunc == nil } diff --git a/pkg/core/resources/store/pagination_store.go b/pkg/core/resources/store/pagination_store.go index d4a973b68409..0caf8c7a2a06 100644 --- a/pkg/core/resources/store/pagination_store.go +++ b/pkg/core/resources/store/pagination_store.go @@ -47,7 +47,7 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt // At least one of the following options is required to trigger the paginationStore to do work. // Otherwise, it delegates the request and returns early. - if opts.FilterFunc == nil && opts.PageSize == 0 && opts.PageOffset == "" && !opts.Ordered { + if opts.FilterFunc == nil && opts.PageSize == 0 && opts.PageOffset == "" && !opts.Ordered && len(opts.ResourceKeys) == 0 { return p.delegate.List(ctx, list, optionsFunc...) } @@ -66,6 +66,16 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt return err } + if len(opts.ResourceKeys) > 0 { + for _, rk := range opts.ResourceKeys { + for _, item := range fullList.GetItems() { + if item.GetMeta().GetMesh() == rk.Mesh && item.GetMeta().GetName() == rk.Name { + _ = filteredList.AddItem(item) + } + } + } + } + for _, item := range fullList.GetItems() { if opts.Filter(item) { _ = filteredList.AddItem(item) diff --git a/pkg/kds/v2/store/sync.go b/pkg/kds/v2/store/sync.go index 874f9e06a51a..c91aad044b12 100644 --- a/pkg/kds/v2/store/sync.go +++ b/pkg/kds/v2/store/sync.go @@ -86,8 +86,15 @@ func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse clien if err != nil { return err } - if err := s.resourceStore.List(ctx, downstream); err != nil { - return err + if upstreamResponse.IsInitialRequest { + if err := s.resourceStore.List(ctx, downstream); err != nil { + return err + } + } else { + upstreamChangeKeys := append(ToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...) + if err := s.resourceStore.List(ctx, downstream, store.ListByResourceKeys(upstreamChangeKeys)); err != nil { + return err + } } log.V(1).Info("before filtering", "downstream", downstream, "upstream", upstream) @@ -310,3 +317,11 @@ func GlobalSyncCallback( }, } } + +func ToResourceKeys(rs core_model.ResourceList) []core_model.ResourceKey { + rkey := []core_model.ResourceKey{} + for _, r := range rs.GetItems() { + rkey = append(rkey, core_model.MetaToResourceKey(r.GetMeta())) + } + return rkey +} diff --git a/pkg/plugins/resources/postgres/pgx_store.go b/pkg/plugins/resources/postgres/pgx_store.go index 31a018b7dda5..2390137fa4b5 100644 --- a/pkg/plugins/resources/postgres/pgx_store.go +++ b/pkg/plugins/resources/postgres/pgx_store.go @@ -183,15 +183,28 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour var statementArgs []interface{} statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 - if opts.Mesh != "" { - argsIndex++ - statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) - statementArgs = append(statementArgs, opts.Mesh) - } - if opts.NameContains != "" { - argsIndex++ - statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) - statementArgs = append(statementArgs, "%"+opts.NameContains+"%") + if len(opts.ResourceKeys) > 0 { + statement += " (" + for idx, rk := range opts.ResourceKeys { + if idx > 0 { + statement += " OR " + } + statement += fmt.Sprintf("(mesh=$%d AND name=$%d))", argsIndex+1, argsIndex+2) + argsIndex += 2 + statementArgs = append(statementArgs, rk.Mesh, rk.Name) + } + statement += ")" + } else { + if opts.Mesh != "" { + argsIndex++ + statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) + statementArgs = append(statementArgs, opts.Mesh) + } + if opts.NameContains != "" { + argsIndex++ + statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) + statementArgs = append(statementArgs, "%"+opts.NameContains+"%") + } } statement += " ORDER BY name, mesh" diff --git a/pkg/plugins/resources/postgres/pq_store.go b/pkg/plugins/resources/postgres/pq_store.go index ee25cc1c022a..08a2ec60600a 100644 --- a/pkg/plugins/resources/postgres/pq_store.go +++ b/pkg/plugins/resources/postgres/pq_store.go @@ -180,15 +180,28 @@ func (r *postgresResourceStore) List(_ context.Context, resources core_model.Res var statementArgs []interface{} statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 - if opts.Mesh != "" { - argsIndex++ - statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) - statementArgs = append(statementArgs, opts.Mesh) - } - if opts.NameContains != "" { - argsIndex++ - statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) - statementArgs = append(statementArgs, "%"+opts.NameContains+"%") + if len(opts.ResourceKeys) > 0 { + statement += " (" + for idx, rk := range opts.ResourceKeys { + if idx > 0 { + statement += " OR " + } + statement += fmt.Sprintf("(mesh=$%d AND name=$%d))", argsIndex+1, argsIndex+2) + argsIndex += 2 + statementArgs = append(statementArgs, rk.Mesh, rk.Name) + } + statement += ")" + } else { + if opts.Mesh != "" { + argsIndex++ + statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) + statementArgs = append(statementArgs, opts.Mesh) + } + if opts.NameContains != "" { + argsIndex++ + statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) + statementArgs = append(statementArgs, "%"+opts.NameContains+"%") + } } statement += " ORDER BY name, mesh" From a2f66451d31677d96ab5b26f70f8e77b27d4dcce Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Tue, 5 Sep 2023 18:52:56 +0200 Subject: [PATCH 2/7] feat(kuma-cp): added configuration option, default disabled Signed-off-by: Lukasz Dziedziak --- docs/generated/kuma-cp.md | 4 ++++ docs/generated/raw/kuma-cp.yaml | 4 ++++ pkg/config/app/kuma-cp/kuma-cp.defaults.yaml | 4 ++++ pkg/config/loader_test.go | 6 ++++++ pkg/config/multizone/kds.go | 4 ++++ pkg/config/multizone/multicluster.go | 10 ++++++---- pkg/kds/global/components.go | 6 +++++- pkg/kds/global/components_test.go | 2 +- pkg/kds/v2/client/zone_sync_test.go | 2 +- pkg/kds/v2/store/sync.go | 14 ++++++++------ pkg/kds/v2/store/sync_test.go | 2 +- pkg/kds/zone/components.go | 6 +++++- pkg/kds/zone/components_test.go | 2 +- 13 files changed, 50 insertions(+), 16 deletions(-) diff --git a/docs/generated/kuma-cp.md b/docs/generated/kuma-cp.md index a599b03b64cb..0771e1dc9ad8 100644 --- a/docs/generated/kuma-cp.md +++ b/docs/generated/kuma-cp.md @@ -478,6 +478,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -498,6 +500,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/docs/generated/raw/kuma-cp.yaml b/docs/generated/raw/kuma-cp.yaml index e248aef7d3da..3b1091b5d118 100644 --- a/docs/generated/raw/kuma-cp.yaml +++ b/docs/generated/raw/kuma-cp.yaml @@ -475,6 +475,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -495,6 +497,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index e248aef7d3da..3b1091b5d118 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -475,6 +475,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -495,6 +497,8 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index e8a165b43ad2..e2f5058709e7 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -245,6 +245,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.MaxMsgSize).To(Equal(uint32(1))) Expect(cfg.Multizone.Global.KDS.MsgSendTimeout.Duration).To(Equal(10 * time.Second)) Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) + Expect(cfg.Multizone.Global.KDS.MaxListQueryElements).To(Equal(uint32(111))) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -253,6 +254,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Zone.KDS.MsgSendTimeout.Duration).To(Equal(20 * time.Second)) Expect(cfg.Multizone.Zone.KDS.NackBackoff.Duration).To(Equal(21 * time.Second)) Expect(cfg.Multizone.Zone.KDS.TlsSkipVerify).To(BeTrue()) + Expect(cfg.Multizone.Zone.KDS.MaxListQueryElements).To(Equal(uint32(112))) Expect(cfg.Defaults.SkipMeshCreation).To(BeTrue()) Expect(cfg.Defaults.SkipTenantResources).To(BeTrue()) @@ -547,6 +549,7 @@ multizone: maxMsgSize: 1 msgSendTimeout: 10s nackBackoff: 11s + maxListQueryElements: 111 zone: globalAddress: "grpc://1.1.1.1:5685" name: "zone-1" @@ -557,6 +560,7 @@ multizone: msgSendTimeout: 20s nackBackoff: 21s tlsSkipVerify: true + maxListQueryElements: 112 dnsServer: domain: test-domain CIDR: 127.1.0.0/16 @@ -829,6 +833,7 @@ proxy: "KUMA_MULTIZONE_GLOBAL_KDS_MAX_MSG_SIZE": "1", "KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT": "10s", "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", + "KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS": "111", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", "KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa", @@ -837,6 +842,7 @@ proxy: "KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT": "20s", "KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF": "21s", "KUMA_MULTIZONE_ZONE_KDS_TLS_SKIP_VERIFY": "true", + "KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS": "112", "KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED": "true", "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_INSIGHT_FLUSH_INTERVAL": "5s", "KUMA_DEFAULTS_SKIP_MESH_CREATION": "true", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index 34c53cd88973..7f902f6415f9 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -35,6 +35,8 @@ type KdsServerConfig struct { MsgSendTimeout config_types.Duration `json:"msgSendTimeout" envconfig:"kuma_multizone_global_kds_msg_send_timeout"` // Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane. NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_global_kds_nack_backoff"` + // MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_multizone_global_kds_max_list_query_elements"` } var _ config.Config = &KdsServerConfig{} @@ -86,6 +88,8 @@ type KdsClientConfig struct { MsgSendTimeout config_types.Duration `json:"msgSendTimeout" envconfig:"kuma_multizone_zone_kds_msg_send_timeout"` // Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane. NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_zone_kds_nack_backoff"` + // MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_multizone_zone_kds_max_list_query_elements"` } var _ config.Config = &KdsClientConfig{} diff --git a/pkg/config/multizone/multicluster.go b/pkg/config/multizone/multicluster.go index 6966c1dcf4c4..9007d4e63371 100644 --- a/pkg/config/multizone/multicluster.go +++ b/pkg/config/multizone/multicluster.go @@ -41,6 +41,7 @@ func DefaultGlobalConfig() *GlobalConfig { TlsMinVersion: "TLSv1_2", TlsCipherSuites: []string{}, NackBackoff: config_types.Duration{Duration: 5 * time.Second}, + MaxListQueryElements: 0, }, } } @@ -98,10 +99,11 @@ func DefaultZoneConfig() *ZoneConfig { GlobalAddress: "", Name: "", KDS: &KdsClientConfig{ - RefreshInterval: config_types.Duration{Duration: 1 * time.Second}, - MaxMsgSize: 10 * 1024 * 1024, - MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second}, - NackBackoff: config_types.Duration{Duration: 5 * time.Second}, + RefreshInterval: config_types.Duration{Duration: 1 * time.Second}, + MaxMsgSize: 10 * 1024 * 1024, + MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second}, + NackBackoff: config_types.Duration{Duration: 5 * time.Second}, + MaxListQueryElements: 0, }, } } diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 0125bba4b0a2..0d2bed7115bf 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -79,7 +79,11 @@ func Setup(rt runtime.Runtime) error { } resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore()) - resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore()) + resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer( + kdsDeltaGlobalLog, + rt.ResourceStore(), + rt.Config().Multizone.Global.KDS.MaxListQueryElements, + ) kubeFactory := resources_k8s.NewSimpleKubeFactory() onSessionStarted := mux.OnSessionStartedFunc(func(session mux.Session) error { log := kdsGlobalLog.WithValues("peer-id", session.PeerID()) diff --git a/pkg/kds/global/components_test.go b/pkg/kds/global/components_test.go index 39f08be15410..467499822a93 100644 --- a/pkg/kds/global/components_test.go +++ b/pkg/kds/global/components_test.go @@ -247,7 +247,7 @@ var _ = Describe("Global Sync", func() { // Start 1 Kuma CP Global globalStore = memory.NewStore() - globalSyncer = sync_store_v2.NewResourceSyncer(core.Log, globalStore) + globalSyncer = sync_store_v2.NewResourceSyncer(core.Log, globalStore, 10) stopCh := make(chan struct{}) clientStreams := []*grpc.MockDeltaClientStream{} for _, ss := range serverStreams { diff --git a/pkg/kds/v2/client/zone_sync_test.go b/pkg/kds/v2/client/zone_sync_test.go index 00c2a83f6c9e..5ad0e84a8563 100644 --- a/pkg/kds/v2/client/zone_sync_test.go +++ b/pkg/kds/v2/client/zone_sync_test.go @@ -86,7 +86,7 @@ var _ = Describe("Zone Delta Sync", func() { clientStream := serverStream.ClientStream(stop) zoneStore = memory.NewStore() - zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore) + zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, 10) wg.Add(1) go func() { diff --git a/pkg/kds/v2/store/sync.go b/pkg/kds/v2/store/sync.go index c91aad044b12..e1d0f8cce844 100644 --- a/pkg/kds/v2/store/sync.go +++ b/pkg/kds/v2/store/sync.go @@ -66,14 +66,16 @@ func PrefilterBy(predicate func(r core_model.Resource) bool) SyncOptionFunc { } type syncResourceStore struct { - log logr.Logger - resourceStore store.ResourceStore + log logr.Logger + resourceStore store.ResourceStore + maxListQueryElements int } -func NewResourceSyncer(log logr.Logger, resourceStore store.ResourceStore) ResourceSyncer { +func NewResourceSyncer(log logr.Logger, resourceStore store.ResourceStore, maxListQueryElements uint32) ResourceSyncer { return &syncResourceStore{ - log: log, - resourceStore: resourceStore, + log: log, + resourceStore: resourceStore, + maxListQueryElements: int(maxListQueryElements), } } @@ -86,7 +88,7 @@ func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse clien if err != nil { return err } - if upstreamResponse.IsInitialRequest { + if len(upstream.GetItems()) >= s.maxListQueryElements || upstreamResponse.IsInitialRequest { if err := s.resourceStore.List(ctx, downstream); err != nil { return err } diff --git a/pkg/kds/v2/store/sync_test.go b/pkg/kds/v2/store/sync_test.go index 0c8aed869eb6..4a7507f4c71d 100644 --- a/pkg/kds/v2/store/sync_test.go +++ b/pkg/kds/v2/store/sync_test.go @@ -46,7 +46,7 @@ var _ = Describe("SyncResourceStoreDelta", func() { BeforeEach(func() { resourceStore = memory.NewStore() - syncer = sync_store.NewResourceSyncer(core.Log, resourceStore) + syncer = sync_store.NewResourceSyncer(core.Log, resourceStore, 10) }) It("should create new resources in empty store", func() { diff --git a/pkg/kds/zone/components.go b/pkg/kds/zone/components.go index f962e0334f38..a3fdb8f02743 100644 --- a/pkg/kds/zone/components.go +++ b/pkg/kds/zone/components.go @@ -72,7 +72,11 @@ func Setup(rt core_runtime.Runtime) error { return err } resourceSyncer := sync_store.NewResourceSyncer(kdsZoneLog, rt.ResourceStore()) - resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer(kdsDeltaZoneLog, rt.ResourceStore()) + resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer( + kdsDeltaZoneLog, + rt.ResourceStore(), + rt.Config().Multizone.Zone.KDS.MaxListQueryElements, + ) kubeFactory := resources_k8s.NewSimpleKubeFactory() cfg := rt.Config() cfgForDisplay, err := config.ConfigForDisplay(&cfg) diff --git a/pkg/kds/zone/components_test.go b/pkg/kds/zone/components_test.go index 4d0bc4a5a5c9..a5eeb9494e48 100644 --- a/pkg/kds/zone/components_test.go +++ b/pkg/kds/zone/components_test.go @@ -264,7 +264,7 @@ var _ = Describe("Zone Sync", func() { clientStream := serverStream.ClientStream(stop) zoneStore = memory.NewStore() - zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore) + zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, 10) wg.Add(1) go func() { From f41709d46d912f6a24e9494211d43f494725ca40 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Tue, 5 Sep 2023 20:37:09 +0200 Subject: [PATCH 3/7] fix(kuma-cp): added test and fixed logic Signed-off-by: Lukasz Dziedziak --- pkg/core/resources/store/pagination_store.go | 10 +++---- pkg/plugins/resources/postgres/pgx_store.go | 4 +-- pkg/plugins/resources/postgres/pq_store.go | 4 +-- pkg/test/store/store_test_template.go | 28 ++++++++++++++++++++ 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pkg/core/resources/store/pagination_store.go b/pkg/core/resources/store/pagination_store.go index 0caf8c7a2a06..b5e47d4afebe 100644 --- a/pkg/core/resources/store/pagination_store.go +++ b/pkg/core/resources/store/pagination_store.go @@ -74,11 +74,11 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt } } } - } - - for _, item := range fullList.GetItems() { - if opts.Filter(item) { - _ = filteredList.AddItem(item) + } else { + for _, item := range fullList.GetItems() { + if opts.Filter(item) { + _ = filteredList.AddItem(item) + } } } diff --git a/pkg/plugins/resources/postgres/pgx_store.go b/pkg/plugins/resources/postgres/pgx_store.go index 2390137fa4b5..a566f163fa9e 100644 --- a/pkg/plugins/resources/postgres/pgx_store.go +++ b/pkg/plugins/resources/postgres/pgx_store.go @@ -184,12 +184,12 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 if len(opts.ResourceKeys) > 0 { - statement += " (" + statement += " AND (" for idx, rk := range opts.ResourceKeys { if idx > 0 { statement += " OR " } - statement += fmt.Sprintf("(mesh=$%d AND name=$%d))", argsIndex+1, argsIndex+2) + statement += fmt.Sprintf("(mesh=$%d AND name=$%d)", argsIndex+1, argsIndex+2) argsIndex += 2 statementArgs = append(statementArgs, rk.Mesh, rk.Name) } diff --git a/pkg/plugins/resources/postgres/pq_store.go b/pkg/plugins/resources/postgres/pq_store.go index 08a2ec60600a..dcd9232b8524 100644 --- a/pkg/plugins/resources/postgres/pq_store.go +++ b/pkg/plugins/resources/postgres/pq_store.go @@ -181,12 +181,12 @@ func (r *postgresResourceStore) List(_ context.Context, resources core_model.Res statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 if len(opts.ResourceKeys) > 0 { - statement += " (" + statement += " AND (" for idx, rk := range opts.ResourceKeys { if idx > 0 { statement += " OR " } - statement += fmt.Sprintf("(mesh=$%d AND name=$%d))", argsIndex+1, argsIndex+2) + statement += fmt.Sprintf("(mesh=$%d AND name=$%d)", argsIndex+1, argsIndex+2) argsIndex += 2 statementArgs = append(statementArgs, rk.Mesh, rk.Name) } diff --git a/pkg/test/store/store_test_template.go b/pkg/test/store/store_test_template.go index c1ba8a8a6582..60165b12a828 100644 --- a/pkg/test/store/store_test_template.go +++ b/pkg/test/store/store_test_template.go @@ -11,6 +11,7 @@ import ( "github.com/kumahq/kuma/api/mesh/v1alpha1" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" + core_model "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/resources/store" resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s" . "github.com/kumahq/kuma/pkg/test/matchers" @@ -391,6 +392,33 @@ func ExecuteStoreTests( }, Equal([]string{"list-res-1.demo", "list-res-2.demo"}))) }) + It("should return a list of 2 resources by resource key", func() { + // given two resources + createResource("list-res-1.demo") + createResource("list-res-2.demo") + rs3 := createResource("list-mes-1.demo") + rs4 := createResource("list-mes-1.default") + + list := core_mesh.TrafficRouteResourceList{} + rk := []core_model.ResourceKey{core_model.MetaToResourceKey(rs3.GetMeta()), core_model.MetaToResourceKey(rs4.GetMeta())} + + // when + err := s.List(context.Background(), &list, store.ListByResourceKeys(rk)) + + // then + Expect(err).ToNot(HaveOccurred()) + // and + Expect(list.Pagination.Total).To(Equal(uint32(2))) + // and + Expect(list.Items).To(WithTransform(func(itms []*core_mesh.TrafficRouteResource) []string { + var res []string + for _, v := range itms { + res = append(res, v.GetMeta().GetName()) + } + return res + }, Equal([]string{"list-mes-1.default", "list-mes-1.demo"}))) + }) + Describe("Pagination", func() { It("should list all resources using pagination", func() { // given From c484c9c235a5558c89963b9e437976b18eded3d2 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Wed, 6 Sep 2023 11:43:32 +0200 Subject: [PATCH 4/7] feat(kuma-cp): code review changes Signed-off-by: Lukasz Dziedziak --- docs/generated/kuma-cp.md | 6 +-- docs/generated/raw/kuma-cp.yaml | 6 +-- pkg/config/app/kuma-cp/kuma-cp.defaults.yaml | 6 +-- pkg/config/loader_test.go | 9 ++-- pkg/config/multizone/kds.go | 4 -- pkg/config/multizone/multicluster.go | 10 ++-- .../plugins/resources/postgres/config.go | 3 ++ pkg/core/resources/model/resource.go | 8 ++++ pkg/kds/global/components.go | 6 +-- pkg/kds/global/components_test.go | 2 +- pkg/kds/v2/client/zone_sync_test.go | 2 +- pkg/kds/v2/store/sync.go | 24 +++------- pkg/kds/v2/store/sync_test.go | 2 +- pkg/kds/zone/components.go | 6 +-- pkg/kds/zone/components_test.go | 2 +- pkg/plugins/resources/postgres/pgx_store.go | 46 +++++++++++++++---- pkg/plugins/resources/postgres/pq_store.go | 32 +++++++++---- 17 files changed, 99 insertions(+), 75 deletions(-) diff --git a/docs/generated/kuma-cp.md b/docs/generated/kuma-cp.md index 0771e1dc9ad8..3c6dfdf8b7de 100644 --- a/docs/generated/kuma-cp.md +++ b/docs/generated/kuma-cp.md @@ -47,6 +47,8 @@ store: # MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool # <0 value means no idle connections and 0 means default max idle connections maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS # TLS settings tls: # Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull" @@ -478,8 +480,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -500,8 +500,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/docs/generated/raw/kuma-cp.yaml b/docs/generated/raw/kuma-cp.yaml index 3b1091b5d118..d634eee63af7 100644 --- a/docs/generated/raw/kuma-cp.yaml +++ b/docs/generated/raw/kuma-cp.yaml @@ -44,6 +44,8 @@ store: # MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool # <0 value means no idle connections and 0 means default max idle connections maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS # TLS settings tls: # Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull" @@ -475,8 +477,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -497,8 +497,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index 3b1091b5d118..d634eee63af7 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -44,6 +44,8 @@ store: # MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool # <0 value means no idle connections and 0 means default max idle connections maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS + # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS # TLS settings tls: # Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull" @@ -475,8 +477,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS zone: # Kuma Zone name used to mark the zone dataplane resources name: "" # ENV: KUMA_MULTIZONE_ZONE_NAME @@ -497,8 +497,6 @@ multizone: msgSendTimeout: 60s # ENV: KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT # Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane nackBackoff: 5s # ENV: KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF - # MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - maxListQueryElements: 0 # ENV: KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS # Diagnostics configuration diagnostics: diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index e2f5058709e7..ba003d0272c6 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -106,6 +106,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Store.Postgres.MaxIdleConnections).To(Equal(300)) Expect(cfg.Store.Postgres.MinReconnectInterval.Duration).To(Equal(44 * time.Second)) Expect(cfg.Store.Postgres.MaxReconnectInterval.Duration).To(Equal(55 * time.Second)) + Expect(cfg.Store.Postgres.MaxListQueryElements).To(Equal(uint32(111))) Expect(cfg.Store.Kubernetes.SystemNamespace).To(Equal("test-namespace")) @@ -245,7 +246,6 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.MaxMsgSize).To(Equal(uint32(1))) Expect(cfg.Multizone.Global.KDS.MsgSendTimeout.Duration).To(Equal(10 * time.Second)) Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) - Expect(cfg.Multizone.Global.KDS.MaxListQueryElements).To(Equal(uint32(111))) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -254,7 +254,6 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Zone.KDS.MsgSendTimeout.Duration).To(Equal(20 * time.Second)) Expect(cfg.Multizone.Zone.KDS.NackBackoff.Duration).To(Equal(21 * time.Second)) Expect(cfg.Multizone.Zone.KDS.TlsSkipVerify).To(BeTrue()) - Expect(cfg.Multizone.Zone.KDS.MaxListQueryElements).To(Equal(uint32(112))) Expect(cfg.Defaults.SkipMeshCreation).To(BeTrue()) Expect(cfg.Defaults.SkipTenantResources).To(BeTrue()) @@ -369,6 +368,7 @@ store: maxIdleConnections: 300 minReconnectInterval: 44s maxReconnectInterval: 55s + maxListQueryElements: 111 tls: mode: verifyFull certPath: /path/to/cert @@ -549,7 +549,6 @@ multizone: maxMsgSize: 1 msgSendTimeout: 10s nackBackoff: 11s - maxListQueryElements: 111 zone: globalAddress: "grpc://1.1.1.1:5685" name: "zone-1" @@ -560,7 +559,6 @@ multizone: msgSendTimeout: 20s nackBackoff: 21s tlsSkipVerify: true - maxListQueryElements: 112 dnsServer: domain: test-domain CIDR: 127.1.0.0/16 @@ -711,6 +709,7 @@ proxy: "KUMA_STORE_POSTGRES_TLS_DISABLE_SSLSNI": "true", "KUMA_STORE_POSTGRES_MIN_RECONNECT_INTERVAL": "44s", "KUMA_STORE_POSTGRES_MAX_RECONNECT_INTERVAL": "55s", + "KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS": "111", "KUMA_STORE_KUBERNETES_SYSTEM_NAMESPACE": "test-namespace", "KUMA_STORE_CACHE_ENABLED": "false", "KUMA_STORE_CACHE_EXPIRATION_TIME": "3s", @@ -833,7 +832,6 @@ proxy: "KUMA_MULTIZONE_GLOBAL_KDS_MAX_MSG_SIZE": "1", "KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT": "10s", "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", - "KUMA_MULTIZONE_GLOBAL_KDS_MAX_LIST_QUERY_ELEMENTS": "111", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", "KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa", @@ -842,7 +840,6 @@ proxy: "KUMA_MULTIZONE_ZONE_KDS_MSG_SEND_TIMEOUT": "20s", "KUMA_MULTIZONE_ZONE_KDS_NACK_BACKOFF": "21s", "KUMA_MULTIZONE_ZONE_KDS_TLS_SKIP_VERIFY": "true", - "KUMA_MULTIZONE_ZONE_KDS_MAX_LIST_QUERY_ELEMENTS": "112", "KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED": "true", "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_INSIGHT_FLUSH_INTERVAL": "5s", "KUMA_DEFAULTS_SKIP_MESH_CREATION": "true", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index 7f902f6415f9..34c53cd88973 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -35,8 +35,6 @@ type KdsServerConfig struct { MsgSendTimeout config_types.Duration `json:"msgSendTimeout" envconfig:"kuma_multizone_global_kds_msg_send_timeout"` // Backoff that is executed when the global control plane is sending the response that was previously rejected by zone control plane. NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_global_kds_nack_backoff"` - // MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_multizone_global_kds_max_list_query_elements"` } var _ config.Config = &KdsServerConfig{} @@ -88,8 +86,6 @@ type KdsClientConfig struct { MsgSendTimeout config_types.Duration `json:"msgSendTimeout" envconfig:"kuma_multizone_zone_kds_msg_send_timeout"` // Backoff that is executed when the zone control plane is sending the response that was previously rejected by global control plane. NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_zone_kds_nack_backoff"` - // MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. - MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_multizone_zone_kds_max_list_query_elements"` } var _ config.Config = &KdsClientConfig{} diff --git a/pkg/config/multizone/multicluster.go b/pkg/config/multizone/multicluster.go index 9007d4e63371..6966c1dcf4c4 100644 --- a/pkg/config/multizone/multicluster.go +++ b/pkg/config/multizone/multicluster.go @@ -41,7 +41,6 @@ func DefaultGlobalConfig() *GlobalConfig { TlsMinVersion: "TLSv1_2", TlsCipherSuites: []string{}, NackBackoff: config_types.Duration{Duration: 5 * time.Second}, - MaxListQueryElements: 0, }, } } @@ -99,11 +98,10 @@ func DefaultZoneConfig() *ZoneConfig { GlobalAddress: "", Name: "", KDS: &KdsClientConfig{ - RefreshInterval: config_types.Duration{Duration: 1 * time.Second}, - MaxMsgSize: 10 * 1024 * 1024, - MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second}, - NackBackoff: config_types.Duration{Duration: 5 * time.Second}, - MaxListQueryElements: 0, + RefreshInterval: config_types.Duration{Duration: 1 * time.Second}, + MaxMsgSize: 10 * 1024 * 1024, + MsgSendTimeout: config_types.Duration{Duration: 60 * time.Second}, + NackBackoff: config_types.Duration{Duration: 5 * time.Second}, }, } } diff --git a/pkg/config/plugins/resources/postgres/config.go b/pkg/config/plugins/resources/postgres/config.go index 24719b0d766e..c61e5c1bb4f4 100644 --- a/pkg/config/plugins/resources/postgres/config.go +++ b/pkg/config/plugins/resources/postgres/config.go @@ -75,6 +75,8 @@ type PostgresStoreConfig struct { // MaxReconnectInterval (applied only when driverName=postgres) controls the maximum possible duration to wait before trying // to re-establish the database connection after connection loss. MaxReconnectInterval config_types.Duration `json:"maxReconnectInterval" envconfig:"kuma_store_postgres_max_reconnect_interval"` + // MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store. + MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_store_postgres_max_list_query_elements"` } func (cfg PostgresStoreConfig) ConnectionString() (string, error) { @@ -274,6 +276,7 @@ func DefaultPostgresStoreConfig() *PostgresStoreConfig { MaxConnectionLifetime: DefaultMaxConnectionLifetime, MaxConnectionLifetimeJitter: DefaultMaxConnectionLifetimeJitter, HealthCheckInterval: DefaultHealthCheckInterval, + MaxListQueryElements: 0, } } diff --git a/pkg/core/resources/model/resource.go b/pkg/core/resources/model/resource.go index 80bfc8cf4088..41b01e138f8b 100644 --- a/pkg/core/resources/model/resource.go +++ b/pkg/core/resources/model/resource.go @@ -267,6 +267,14 @@ func MetaToResourceKey(meta ResourceMeta) ResourceKey { } } +func ResourceListToResourceKeys(rl ResourceList) []ResourceKey { + rkey := []ResourceKey{} + for _, r := range rl.GetItems() { + rkey = append(rkey, MetaToResourceKey(r.GetMeta())) + } + return rkey +} + type ResourceList interface { GetItemType() ResourceType GetItems() []Resource diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 0d2bed7115bf..0125bba4b0a2 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -79,11 +79,7 @@ func Setup(rt runtime.Runtime) error { } resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore()) - resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer( - kdsDeltaGlobalLog, - rt.ResourceStore(), - rt.Config().Multizone.Global.KDS.MaxListQueryElements, - ) + resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore()) kubeFactory := resources_k8s.NewSimpleKubeFactory() onSessionStarted := mux.OnSessionStartedFunc(func(session mux.Session) error { log := kdsGlobalLog.WithValues("peer-id", session.PeerID()) diff --git a/pkg/kds/global/components_test.go b/pkg/kds/global/components_test.go index 467499822a93..39f08be15410 100644 --- a/pkg/kds/global/components_test.go +++ b/pkg/kds/global/components_test.go @@ -247,7 +247,7 @@ var _ = Describe("Global Sync", func() { // Start 1 Kuma CP Global globalStore = memory.NewStore() - globalSyncer = sync_store_v2.NewResourceSyncer(core.Log, globalStore, 10) + globalSyncer = sync_store_v2.NewResourceSyncer(core.Log, globalStore) stopCh := make(chan struct{}) clientStreams := []*grpc.MockDeltaClientStream{} for _, ss := range serverStreams { diff --git a/pkg/kds/v2/client/zone_sync_test.go b/pkg/kds/v2/client/zone_sync_test.go index 5ad0e84a8563..00c2a83f6c9e 100644 --- a/pkg/kds/v2/client/zone_sync_test.go +++ b/pkg/kds/v2/client/zone_sync_test.go @@ -86,7 +86,7 @@ var _ = Describe("Zone Delta Sync", func() { clientStream := serverStream.ClientStream(stop) zoneStore = memory.NewStore() - zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, 10) + zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore) wg.Add(1) go func() { diff --git a/pkg/kds/v2/store/sync.go b/pkg/kds/v2/store/sync.go index e1d0f8cce844..a76845af1fa8 100644 --- a/pkg/kds/v2/store/sync.go +++ b/pkg/kds/v2/store/sync.go @@ -66,16 +66,14 @@ func PrefilterBy(predicate func(r core_model.Resource) bool) SyncOptionFunc { } type syncResourceStore struct { - log logr.Logger - resourceStore store.ResourceStore - maxListQueryElements int + log logr.Logger + resourceStore store.ResourceStore } -func NewResourceSyncer(log logr.Logger, resourceStore store.ResourceStore, maxListQueryElements uint32) ResourceSyncer { +func NewResourceSyncer(log logr.Logger, resourceStore store.ResourceStore) ResourceSyncer { return &syncResourceStore{ - log: log, - resourceStore: resourceStore, - maxListQueryElements: int(maxListQueryElements), + log: log, + resourceStore: resourceStore, } } @@ -88,12 +86,12 @@ func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse clien if err != nil { return err } - if len(upstream.GetItems()) >= s.maxListQueryElements || upstreamResponse.IsInitialRequest { + if upstreamResponse.IsInitialRequest { if err := s.resourceStore.List(ctx, downstream); err != nil { return err } } else { - upstreamChangeKeys := append(ToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...) + upstreamChangeKeys := append(core_model.ResourceListToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...) if err := s.resourceStore.List(ctx, downstream, store.ListByResourceKeys(upstreamChangeKeys)); err != nil { return err } @@ -319,11 +317,3 @@ func GlobalSyncCallback( }, } } - -func ToResourceKeys(rs core_model.ResourceList) []core_model.ResourceKey { - rkey := []core_model.ResourceKey{} - for _, r := range rs.GetItems() { - rkey = append(rkey, core_model.MetaToResourceKey(r.GetMeta())) - } - return rkey -} diff --git a/pkg/kds/v2/store/sync_test.go b/pkg/kds/v2/store/sync_test.go index 4a7507f4c71d..0c8aed869eb6 100644 --- a/pkg/kds/v2/store/sync_test.go +++ b/pkg/kds/v2/store/sync_test.go @@ -46,7 +46,7 @@ var _ = Describe("SyncResourceStoreDelta", func() { BeforeEach(func() { resourceStore = memory.NewStore() - syncer = sync_store.NewResourceSyncer(core.Log, resourceStore, 10) + syncer = sync_store.NewResourceSyncer(core.Log, resourceStore) }) It("should create new resources in empty store", func() { diff --git a/pkg/kds/zone/components.go b/pkg/kds/zone/components.go index a3fdb8f02743..f962e0334f38 100644 --- a/pkg/kds/zone/components.go +++ b/pkg/kds/zone/components.go @@ -72,11 +72,7 @@ func Setup(rt core_runtime.Runtime) error { return err } resourceSyncer := sync_store.NewResourceSyncer(kdsZoneLog, rt.ResourceStore()) - resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer( - kdsDeltaZoneLog, - rt.ResourceStore(), - rt.Config().Multizone.Zone.KDS.MaxListQueryElements, - ) + resourceSyncerV2 := kds_sync_store_v2.NewResourceSyncer(kdsDeltaZoneLog, rt.ResourceStore()) kubeFactory := resources_k8s.NewSimpleKubeFactory() cfg := rt.Config() cfgForDisplay, err := config.ConfigForDisplay(&cfg) diff --git a/pkg/kds/zone/components_test.go b/pkg/kds/zone/components_test.go index a5eeb9494e48..4d0bc4a5a5c9 100644 --- a/pkg/kds/zone/components_test.go +++ b/pkg/kds/zone/components_test.go @@ -264,7 +264,7 @@ var _ = Describe("Zone Sync", func() { clientStream := serverStream.ClientStream(stop) zoneStore = memory.NewStore() - zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, 10) + zoneSyncer = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore) wg.Add(1) go func() { diff --git a/pkg/plugins/resources/postgres/pgx_store.go b/pkg/plugins/resources/postgres/pgx_store.go index a566f163fa9e..a6f0e546549b 100644 --- a/pkg/plugins/resources/postgres/pgx_store.go +++ b/pkg/plugins/resources/postgres/pgx_store.go @@ -22,9 +22,12 @@ import ( ) type pgxResourceStore struct { - pool *pgxpool.Pool + pool *pgxpool.Pool + maxListQueryElements uint32 } +type ResourceNamesByMesh map[string][]string + var _ store.ResourceStore = &pgxResourceStore{} func NewPgxStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig, customizer pgx_config.PgxConfigCustomization) (store.ResourceStore, error) { @@ -38,7 +41,8 @@ func NewPgxStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig } return &pgxResourceStore{ - pool: pool, + pool: pool, + maxListQueryElements: config.MaxListQueryElements, }, nil } @@ -183,15 +187,29 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour var statementArgs []interface{} statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 - if len(opts.ResourceKeys) > 0 { + rkSize := len(opts.ResourceKeys) + if rkSize > 0 && rkSize < int(r.maxListQueryElements) { statement += " AND (" - for idx, rk := range opts.ResourceKeys { - if idx > 0 { + res := resourceNamesByMesh(opts.ResourceKeys) + iter := 0 + for mesh, names := range res { + if iter > 0 { statement += " OR " } - statement += fmt.Sprintf("(mesh=$%d AND name=$%d)", argsIndex+1, argsIndex+2) - argsIndex += 2 - statementArgs = append(statementArgs, rk.Mesh, rk.Name) + argsIndex++ + statement += fmt.Sprintf("(mesh=$%d AND", argsIndex) + statementArgs = append(statementArgs, mesh) + for idx, name := range names { + argsIndex++ + if idx == 0 { + statement += fmt.Sprintf(" name IN ($%d", argsIndex) + } else { + statement += fmt.Sprintf(",$%d", argsIndex) + } + statementArgs = append(statementArgs, name) + } + statement += "))" + iter++ } statement += ")" } else { @@ -230,6 +248,18 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour return nil } +func resourceNamesByMesh(resourceKeys []core_model.ResourceKey) ResourceNamesByMesh { + resourceNamesByMesh := ResourceNamesByMesh{} + for _, rk := range resourceKeys { + if val, exists := resourceNamesByMesh[rk.Mesh]; exists { + resourceNamesByMesh[rk.Mesh] = append(val, rk.Name) + } else { + resourceNamesByMesh[rk.Mesh] = []string{rk.Name} + } + } + return resourceNamesByMesh +} + func rowToItem(resources core_model.ResourceList, rows pgx.Rows) (core_model.Resource, error) { var name, mesh, spec string var version int diff --git a/pkg/plugins/resources/postgres/pq_store.go b/pkg/plugins/resources/postgres/pq_store.go index dcd9232b8524..85a01a83ff46 100644 --- a/pkg/plugins/resources/postgres/pq_store.go +++ b/pkg/plugins/resources/postgres/pq_store.go @@ -20,7 +20,8 @@ import ( ) type postgresResourceStore struct { - db *sql.DB + db *sql.DB + maxListQueryElements uint32 } var _ store.ResourceStore = &postgresResourceStore{} @@ -36,7 +37,8 @@ func NewPqStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig) } return &postgresResourceStore{ - db: db, + db: db, + maxListQueryElements: config.MaxListQueryElements, }, nil } @@ -180,15 +182,29 @@ func (r *postgresResourceStore) List(_ context.Context, resources core_model.Res var statementArgs []interface{} statementArgs = append(statementArgs, resources.GetItemType()) argsIndex := 1 - if len(opts.ResourceKeys) > 0 { + rkSize := len(opts.ResourceKeys) + if rkSize > 0 && rkSize < int(r.maxListQueryElements) { statement += " AND (" - for idx, rk := range opts.ResourceKeys { - if idx > 0 { + res := resourceNamesByMesh(opts.ResourceKeys) + iter := 0 + for mesh, names := range res { + if iter > 0 { statement += " OR " } - statement += fmt.Sprintf("(mesh=$%d AND name=$%d)", argsIndex+1, argsIndex+2) - argsIndex += 2 - statementArgs = append(statementArgs, rk.Mesh, rk.Name) + argsIndex++ + statement += fmt.Sprintf("(mesh=$%d AND", argsIndex) + statementArgs = append(statementArgs, mesh) + for idx, name := range names { + argsIndex++ + if idx == 0 { + statement += fmt.Sprintf(" name IN ($%d", argsIndex) + } else { + statement += fmt.Sprintf(",$%d", argsIndex) + } + statementArgs = append(statementArgs, name) + } + statement += "))" + iter++ } statement += ")" } else { From 537eae9fcf0855643f70aaa4c5685cccefde6c03 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Wed, 6 Sep 2023 14:26:34 +0200 Subject: [PATCH 5/7] feat(kuma-cp): added test case with max query configuration Signed-off-by: Lukasz Dziedziak --- .../resources/postgres/store_template_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/plugins/resources/postgres/store_template_test.go b/pkg/plugins/resources/postgres/store_template_test.go index 420b87b0849d..b517541a06d1 100644 --- a/pkg/plugins/resources/postgres/store_template_test.go +++ b/pkg/plugins/resources/postgres/store_template_test.go @@ -13,10 +13,11 @@ import ( ) var _ = Describe("PostgresStore template", func() { - createStore := func(storeName string) func() store.ResourceStore { + createStore := func(storeName string, maxListQueryElements int) func() store.ResourceStore { return func() store.ResourceStore { cfg, err := c.Config(test_postgres.WithRandomDb) Expect(err).ToNot(HaveOccurred()) + cfg.MaxListQueryElements = uint32(maxListQueryElements) cfg.MaxOpenConnections = 2 pqMetrics, err := core_metrics.NewMetrics("Standalone") @@ -41,8 +42,10 @@ var _ = Describe("PostgresStore template", func() { } } - test_store.ExecuteStoreTests(createStore("pgx"), "pgx") - test_store.ExecuteOwnerTests(createStore("pgx"), "pgx") - test_store.ExecuteStoreTests(createStore("pq"), "pq") - test_store.ExecuteOwnerTests(createStore("pq"), "pq") + test_store.ExecuteStoreTests(createStore("pgx", 0), "pgx") + test_store.ExecuteStoreTests(createStore("pgx", 4), "pgx") + test_store.ExecuteOwnerTests(createStore("pgx", 0), "pgx") + test_store.ExecuteStoreTests(createStore("pq", 0), "pq") + test_store.ExecuteStoreTests(createStore("pq", 4), "pq") + test_store.ExecuteOwnerTests(createStore("pq", 0), "pq") }) From 3372fe06fe018cb4a7ce7df00a960a55e28b6041 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Wed, 6 Sep 2023 16:20:50 +0200 Subject: [PATCH 6/7] feat(kuma-cp): changed to map, and removed filter from query Signed-off-by: Lukasz Dziedziak --- pkg/core/resources/store/options.go | 8 +++-- pkg/core/resources/store/pagination_store.go | 8 ++--- pkg/plugins/resources/postgres/pgx_store.go | 31 ++++++++++---------- pkg/plugins/resources/postgres/pq_store.go | 21 +++++++------ 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/core/resources/store/options.go b/pkg/core/resources/store/options.go index ada5787c48b3..95154dd83102 100644 --- a/pkg/core/resources/store/options.go +++ b/pkg/core/resources/store/options.go @@ -161,7 +161,7 @@ type ListOptions struct { FilterFunc ListFilterFunc NameContains string Ordered bool - ResourceKeys []core_model.ResourceKey + ResourceKeys map[core_model.ResourceKey]struct{} } type ListOptionsFunc func(*ListOptions) @@ -216,7 +216,11 @@ func ListOrdered() ListOptionsFunc { func ListByResourceKeys(rk []core_model.ResourceKey) ListOptionsFunc { return func(opts *ListOptions) { - opts.ResourceKeys = rk + resourcesKeys := map[core_model.ResourceKey]struct{}{} + for _, val := range rk { + resourcesKeys[val] = struct{}{} + } + opts.ResourceKeys = resourcesKeys } } diff --git a/pkg/core/resources/store/pagination_store.go b/pkg/core/resources/store/pagination_store.go index b5e47d4afebe..aff25a0f5737 100644 --- a/pkg/core/resources/store/pagination_store.go +++ b/pkg/core/resources/store/pagination_store.go @@ -67,11 +67,9 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt } if len(opts.ResourceKeys) > 0 { - for _, rk := range opts.ResourceKeys { - for _, item := range fullList.GetItems() { - if item.GetMeta().GetMesh() == rk.Mesh && item.GetMeta().GetName() == rk.Name { - _ = filteredList.AddItem(item) - } + for _, item := range fullList.GetItems() { + if _, exists := opts.ResourceKeys[model.MetaToResourceKey(item.GetMeta())]; exists { + _ = filteredList.AddItem(item) } } } else { diff --git a/pkg/plugins/resources/postgres/pgx_store.go b/pkg/plugins/resources/postgres/pgx_store.go index a6f0e546549b..b9adc37ed0c3 100644 --- a/pkg/plugins/resources/postgres/pgx_store.go +++ b/pkg/plugins/resources/postgres/pgx_store.go @@ -212,17 +212,16 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour iter++ } statement += ")" - } else { - if opts.Mesh != "" { - argsIndex++ - statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) - statementArgs = append(statementArgs, opts.Mesh) - } - if opts.NameContains != "" { - argsIndex++ - statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) - statementArgs = append(statementArgs, "%"+opts.NameContains+"%") - } + } + if opts.Mesh != "" { + argsIndex++ + statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) + statementArgs = append(statementArgs, opts.Mesh) + } + if opts.NameContains != "" { + argsIndex++ + statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) + statementArgs = append(statementArgs, "%"+opts.NameContains+"%") } statement += " ORDER BY name, mesh" @@ -248,13 +247,13 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour return nil } -func resourceNamesByMesh(resourceKeys []core_model.ResourceKey) ResourceNamesByMesh { +func resourceNamesByMesh(resourceKeys map[core_model.ResourceKey]struct{}) ResourceNamesByMesh { resourceNamesByMesh := ResourceNamesByMesh{} - for _, rk := range resourceKeys { - if val, exists := resourceNamesByMesh[rk.Mesh]; exists { - resourceNamesByMesh[rk.Mesh] = append(val, rk.Name) + for key := range resourceKeys { + if val, exists := resourceNamesByMesh[key.Mesh]; exists { + resourceNamesByMesh[key.Mesh] = append(val, key.Name) } else { - resourceNamesByMesh[rk.Mesh] = []string{rk.Name} + resourceNamesByMesh[key.Mesh] = []string{key.Name} } } return resourceNamesByMesh diff --git a/pkg/plugins/resources/postgres/pq_store.go b/pkg/plugins/resources/postgres/pq_store.go index 85a01a83ff46..5465bb34a5f7 100644 --- a/pkg/plugins/resources/postgres/pq_store.go +++ b/pkg/plugins/resources/postgres/pq_store.go @@ -207,17 +207,16 @@ func (r *postgresResourceStore) List(_ context.Context, resources core_model.Res iter++ } statement += ")" - } else { - if opts.Mesh != "" { - argsIndex++ - statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) - statementArgs = append(statementArgs, opts.Mesh) - } - if opts.NameContains != "" { - argsIndex++ - statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) - statementArgs = append(statementArgs, "%"+opts.NameContains+"%") - } + } + if opts.Mesh != "" { + argsIndex++ + statement += fmt.Sprintf(" AND mesh=$%d", argsIndex) + statementArgs = append(statementArgs, opts.Mesh) + } + if opts.NameContains != "" { + argsIndex++ + statement += fmt.Sprintf(" AND name LIKE $%d", argsIndex) + statementArgs = append(statementArgs, "%"+opts.NameContains+"%") } statement += " ORDER BY name, mesh" From 176440ffacd7ca4fbb65bcf1428ced4a6cf35bd2 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Thu, 7 Sep 2023 10:48:02 +0200 Subject: [PATCH 7/7] feat(kuma-cp): refactor method based on review Signed-off-by: Lukasz Dziedziak --- pkg/core/resources/store/pagination_store.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/core/resources/store/pagination_store.go b/pkg/core/resources/store/pagination_store.go index aff25a0f5737..6324fc873067 100644 --- a/pkg/core/resources/store/pagination_store.go +++ b/pkg/core/resources/store/pagination_store.go @@ -66,18 +66,15 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt return err } - if len(opts.ResourceKeys) > 0 { - for _, item := range fullList.GetItems() { - if _, exists := opts.ResourceKeys[model.MetaToResourceKey(item.GetMeta())]; exists { - _ = filteredList.AddItem(item) - } + for _, item := range fullList.GetItems() { + _, exists := opts.ResourceKeys[model.MetaToResourceKey(item.GetMeta())] + if len(opts.ResourceKeys) > 0 && !exists { + continue } - } else { - for _, item := range fullList.GetItems() { - if opts.Filter(item) { - _ = filteredList.AddItem(item) - } + if !opts.Filter(item) { + continue } + _ = filteredList.AddItem(item) } filteredItems := filteredList.GetItems()