Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): pass resource keys to resourceStore for delta kds #7654

Merged
merged 7 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ store:
# MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool
# <0 value means no idle connections and 0 means default max idle connections
maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
# MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store.
maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS
# TLS settings
tls:
# Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull"
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ store:
# MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool
# <0 value means no idle connections and 0 means default max idle connections
maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
# MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store.
maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS
# TLS settings
tls:
# Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ store:
# MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool
# <0 value means no idle connections and 0 means default max idle connections
maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
# MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store.
maxListQueryElements: 0 # ENV: KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS
# TLS settings
tls:
# Mode of TLS connection. Available values are: "disable", "verifyNone", "verifyCa", "verifyFull"
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Store.Postgres.MaxIdleConnections).To(Equal(300))
Expect(cfg.Store.Postgres.MinReconnectInterval.Duration).To(Equal(44 * time.Second))
Expect(cfg.Store.Postgres.MaxReconnectInterval.Duration).To(Equal(55 * time.Second))
Expect(cfg.Store.Postgres.MaxListQueryElements).To(Equal(uint32(111)))

Expect(cfg.Store.Kubernetes.SystemNamespace).To(Equal("test-namespace"))

Expand Down Expand Up @@ -367,6 +368,7 @@ store:
maxIdleConnections: 300
minReconnectInterval: 44s
maxReconnectInterval: 55s
maxListQueryElements: 111
tls:
mode: verifyFull
certPath: /path/to/cert
Expand Down Expand Up @@ -707,6 +709,7 @@ proxy:
"KUMA_STORE_POSTGRES_TLS_DISABLE_SSLSNI": "true",
"KUMA_STORE_POSTGRES_MIN_RECONNECT_INTERVAL": "44s",
"KUMA_STORE_POSTGRES_MAX_RECONNECT_INTERVAL": "55s",
"KUMA_STORE_POSTGRES_MAX_LIST_QUERY_ELEMENTS": "111",
"KUMA_STORE_KUBERNETES_SYSTEM_NAMESPACE": "test-namespace",
"KUMA_STORE_CACHE_ENABLED": "false",
"KUMA_STORE_CACHE_EXPIRATION_TIME": "3s",
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/plugins/resources/postgres/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type PostgresStoreConfig struct {
// MaxReconnectInterval (applied only when driverName=postgres) controls the maximum possible duration to wait before trying
// to re-establish the database connection after connection loss.
MaxReconnectInterval config_types.Duration `json:"maxReconnectInterval" envconfig:"kuma_store_postgres_max_reconnect_interval"`
// MaxListQueryElements defines maximum number of changed elements before requesting full list of elements from the store.
MaxListQueryElements uint32 `json:"maxListQueryElements" envconfig:"kuma_store_postgres_max_list_query_elements"`
}

func (cfg PostgresStoreConfig) ConnectionString() (string, error) {
Expand Down Expand Up @@ -274,6 +276,7 @@ func DefaultPostgresStoreConfig() *PostgresStoreConfig {
MaxConnectionLifetime: DefaultMaxConnectionLifetime,
MaxConnectionLifetimeJitter: DefaultMaxConnectionLifetimeJitter,
HealthCheckInterval: DefaultHealthCheckInterval,
MaxListQueryElements: 0,
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/core/resources/model/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ func MetaToResourceKey(meta ResourceMeta) ResourceKey {
}
}

func ResourceListToResourceKeys(rl ResourceList) []ResourceKey {
rkey := []ResourceKey{}
for _, r := range rl.GetItems() {
rkey = append(rkey, MetaToResourceKey(r.GetMeta()))
}
return rkey
}

type ResourceList interface {
GetItemType() ResourceType
GetItems() []Resource
Expand Down
11 changes: 11 additions & 0 deletions pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type ListOptions struct {
FilterFunc ListFilterFunc
NameContains string
Ordered bool
ResourceKeys map[core_model.ResourceKey]struct{}
}

type ListOptionsFunc func(*ListOptions)
Expand Down Expand Up @@ -213,6 +214,16 @@ func ListOrdered() ListOptionsFunc {
}
}

func ListByResourceKeys(rk []core_model.ResourceKey) ListOptionsFunc {
return func(opts *ListOptions) {
resourcesKeys := map[core_model.ResourceKey]struct{}{}
for _, val := range rk {
resourcesKeys[val] = struct{}{}
}
opts.ResourceKeys = resourcesKeys
}
}

func (l *ListOptions) IsCacheable() bool {
return l.FilterFunc == nil
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/core/resources/store/pagination_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt

// At least one of the following options is required to trigger the paginationStore to do work.
// Otherwise, it delegates the request and returns early.
if opts.FilterFunc == nil && opts.PageSize == 0 && opts.PageOffset == "" && !opts.Ordered {
if opts.FilterFunc == nil && opts.PageSize == 0 && opts.PageOffset == "" && !opts.Ordered && len(opts.ResourceKeys) == 0 {
return p.delegate.List(ctx, list, optionsFunc...)
}

Expand All @@ -66,9 +66,17 @@ func (p *paginationStore) List(ctx context.Context, list model.ResourceList, opt
return err
}

for _, item := range fullList.GetItems() {
if opts.Filter(item) {
_ = filteredList.AddItem(item)
if len(opts.ResourceKeys) > 0 {
for _, item := range fullList.GetItems() {
if _, exists := opts.ResourceKeys[model.MetaToResourceKey(item.GetMeta())]; exists {
_ = filteredList.AddItem(item)
}
}
} else {
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
for _, item := range fullList.GetItems() {
if opts.Filter(item) {
_ = filteredList.AddItem(item)
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/kds/v2/store/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,15 @@ func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse clien
if err != nil {
return err
}
if err := s.resourceStore.List(ctx, downstream); err != nil {
return err
if upstreamResponse.IsInitialRequest {
if err := s.resourceStore.List(ctx, downstream); err != nil {
return err
}
} else {
upstreamChangeKeys := append(core_model.ResourceListToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...)
if err := s.resourceStore.List(ctx, downstream, store.ListByResourceKeys(upstreamChangeKeys)); err != nil {
return err
}
}
log.V(1).Info("before filtering", "downstream", downstream, "upstream", upstream)

Expand Down
46 changes: 44 additions & 2 deletions pkg/plugins/resources/postgres/pgx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
)

type pgxResourceStore struct {
pool *pgxpool.Pool
pool *pgxpool.Pool
maxListQueryElements uint32
}

type ResourceNamesByMesh map[string][]string

var _ store.ResourceStore = &pgxResourceStore{}

func NewPgxStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig, customizer pgx_config.PgxConfigCustomization) (store.ResourceStore, error) {
Expand All @@ -38,7 +41,8 @@ func NewPgxStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig
}

return &pgxResourceStore{
pool: pool,
pool: pool,
maxListQueryElements: config.MaxListQueryElements,
}, nil
}

Expand Down Expand Up @@ -183,6 +187,32 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour
var statementArgs []interface{}
statementArgs = append(statementArgs, resources.GetItemType())
argsIndex := 1
rkSize := len(opts.ResourceKeys)
if rkSize > 0 && rkSize < int(r.maxListQueryElements) {
statement += " AND ("
res := resourceNamesByMesh(opts.ResourceKeys)
iter := 0
for mesh, names := range res {
if iter > 0 {
statement += " OR "
}
argsIndex++
statement += fmt.Sprintf("(mesh=$%d AND", argsIndex)
statementArgs = append(statementArgs, mesh)
for idx, name := range names {
argsIndex++
if idx == 0 {
statement += fmt.Sprintf(" name IN ($%d", argsIndex)
} else {
statement += fmt.Sprintf(",$%d", argsIndex)
}
statementArgs = append(statementArgs, name)
}
statement += "))"
iter++
}
statement += ")"
}
if opts.Mesh != "" {
argsIndex++
statement += fmt.Sprintf(" AND mesh=$%d", argsIndex)
Expand Down Expand Up @@ -217,6 +247,18 @@ func (r *pgxResourceStore) List(ctx context.Context, resources core_model.Resour
return nil
}

func resourceNamesByMesh(resourceKeys map[core_model.ResourceKey]struct{}) ResourceNamesByMesh {
resourceNamesByMesh := ResourceNamesByMesh{}
for key := range resourceKeys {
if val, exists := resourceNamesByMesh[key.Mesh]; exists {
resourceNamesByMesh[key.Mesh] = append(val, key.Name)
} else {
resourceNamesByMesh[key.Mesh] = []string{key.Name}
}
}
return resourceNamesByMesh
}

func rowToItem(resources core_model.ResourceList, rows pgx.Rows) (core_model.Resource, error) {
var name, mesh, spec string
var version int
Expand Down
32 changes: 30 additions & 2 deletions pkg/plugins/resources/postgres/pq_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

type postgresResourceStore struct {
db *sql.DB
db *sql.DB
maxListQueryElements uint32
}

var _ store.ResourceStore = &postgresResourceStore{}
Expand All @@ -36,7 +37,8 @@ func NewPqStore(metrics core_metrics.Metrics, config config.PostgresStoreConfig)
}

return &postgresResourceStore{
db: db,
db: db,
maxListQueryElements: config.MaxListQueryElements,
}, nil
}

Expand Down Expand Up @@ -180,6 +182,32 @@ func (r *postgresResourceStore) List(_ context.Context, resources core_model.Res
var statementArgs []interface{}
statementArgs = append(statementArgs, resources.GetItemType())
argsIndex := 1
rkSize := len(opts.ResourceKeys)
if rkSize > 0 && rkSize < int(r.maxListQueryElements) {
statement += " AND ("
res := resourceNamesByMesh(opts.ResourceKeys)
iter := 0
for mesh, names := range res {
if iter > 0 {
statement += " OR "
}
argsIndex++
statement += fmt.Sprintf("(mesh=$%d AND", argsIndex)
statementArgs = append(statementArgs, mesh)
for idx, name := range names {
argsIndex++
if idx == 0 {
statement += fmt.Sprintf(" name IN ($%d", argsIndex)
} else {
statement += fmt.Sprintf(",$%d", argsIndex)
}
statementArgs = append(statementArgs, name)
}
statement += "))"
iter++
}
statement += ")"
}
if opts.Mesh != "" {
argsIndex++
statement += fmt.Sprintf(" AND mesh=$%d", argsIndex)
Expand Down
13 changes: 8 additions & 5 deletions pkg/plugins/resources/postgres/store_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
)

var _ = Describe("PostgresStore template", func() {
createStore := func(storeName string) func() store.ResourceStore {
createStore := func(storeName string, maxListQueryElements int) func() store.ResourceStore {
return func() store.ResourceStore {
cfg, err := c.Config(test_postgres.WithRandomDb)
Expect(err).ToNot(HaveOccurred())
cfg.MaxListQueryElements = uint32(maxListQueryElements)
cfg.MaxOpenConnections = 2

pqMetrics, err := core_metrics.NewMetrics("Standalone")
Expand All @@ -41,8 +42,10 @@ var _ = Describe("PostgresStore template", func() {
}
}

test_store.ExecuteStoreTests(createStore("pgx"), "pgx")
test_store.ExecuteOwnerTests(createStore("pgx"), "pgx")
test_store.ExecuteStoreTests(createStore("pq"), "pq")
test_store.ExecuteOwnerTests(createStore("pq"), "pq")
test_store.ExecuteStoreTests(createStore("pgx", 0), "pgx")
test_store.ExecuteStoreTests(createStore("pgx", 4), "pgx")
test_store.ExecuteOwnerTests(createStore("pgx", 0), "pgx")
test_store.ExecuteStoreTests(createStore("pq", 0), "pq")
test_store.ExecuteStoreTests(createStore("pq", 4), "pq")
test_store.ExecuteOwnerTests(createStore("pq", 0), "pq")
})
28 changes: 28 additions & 0 deletions pkg/test/store/store_test_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/kumahq/kuma/api/mesh/v1alpha1"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s"
. "github.com/kumahq/kuma/pkg/test/matchers"
Expand Down Expand Up @@ -391,6 +392,33 @@ func ExecuteStoreTests(
}, Equal([]string{"list-res-1.demo", "list-res-2.demo"})))
})

It("should return a list of 2 resources by resource key", func() {
// given two resources
createResource("list-res-1.demo")
createResource("list-res-2.demo")
rs3 := createResource("list-mes-1.demo")
rs4 := createResource("list-mes-1.default")

list := core_mesh.TrafficRouteResourceList{}
rk := []core_model.ResourceKey{core_model.MetaToResourceKey(rs3.GetMeta()), core_model.MetaToResourceKey(rs4.GetMeta())}

// when
err := s.List(context.Background(), &list, store.ListByResourceKeys(rk))

// then
Expect(err).ToNot(HaveOccurred())
// and
Expect(list.Pagination.Total).To(Equal(uint32(2)))
// and
Expect(list.Items).To(WithTransform(func(itms []*core_mesh.TrafficRouteResource) []string {
var res []string
for _, v := range itms {
res = append(res, v.GetMeta().GetName())
}
return res
}, Equal([]string{"list-mes-1.default", "list-mes-1.demo"})))
})

Describe("Pagination", func() {
It("should list all resources using pagination", func() {
// given
Expand Down