Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(store): postgres transactions #7995

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,12 @@ func initializeResourceStore(cfg kuma_cp.Config, builder *core_runtime.Builder)
return errors.Wrapf(err, "could not retrieve store %s plugin", pluginName)
}

rs, err := plugin.NewResourceStore(builder, pluginConfig)
rs, transactions, err := plugin.NewResourceStore(builder, pluginConfig)
if err != nil {
return err
}
builder.WithResourceStore(core_store.NewCustomizableResourceStore(rs))
builder.WithTransactions(transactions)
eventBus, err := events.NewEventBus(cfg.EventBus.BufferSize, builder.Metrics())
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/plugins/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type (
DbVersion = uint
ResourceStorePlugin interface {
Plugin
NewResourceStore(PluginContext, PluginConfig) (core_store.ResourceStore, error)
NewResourceStore(PluginContext, PluginConfig) (core_store.ResourceStore, core_store.Transactions, error)
Migrate(PluginContext, PluginConfig) (DbVersion, error)
EventListener(PluginContext, events.Emitter) error
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/core/resources/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type ConflictRetry struct {

type UpsertOpts struct {
ConflictRetry ConflictRetry
Transactions store.Transactions
}

type UpsertFunc func(opts *UpsertOpts)
Expand All @@ -119,8 +120,16 @@ func WithConflictRetry(baseBackoff time.Duration, maxTimes uint, jitterPercent u
}
}

func WithTransactions(transactions store.Transactions) UpsertFunc {
return func(opts *UpsertOpts) {
opts.Transactions = transactions
}
}

func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
opts := UpsertOpts{}
opts := UpsertOpts{
Transactions: store.NoTransactions{},
}
for _, f := range fs {
f(&opts)
}
Expand All @@ -130,30 +139,32 @@ func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
var ErrSkipUpsert = errors.New("don't do upsert")

func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
opts := NewUpsertOpts(fs...)
upsert := func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
} else {
return store.InTx(ctx, opts.Transactions, func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
} else {
return err
}
}
if err := fn(resource); err != nil {
if err == ErrSkipUpsert { // Way to skip inserts when there are no change
return nil
}
return err
}
}
if err := fn(resource); err != nil {
if err == ErrSkipUpsert { // Way to skip inserts when there are no change
return nil
if create {
return manager.Create(ctx, resource, store.CreateBy(key))
} else {
return manager.Update(ctx, resource)
}
return err
}
if create {
return manager.Create(ctx, resource, store.CreateBy(key))
} else {
return manager.Update(ctx, resource)
}
})
}

opts := NewUpsertOpts(fs...)
if opts.ConflictRetry.BaseBackoff <= 0 || opts.ConflictRetry.MaxTimes == 0 {
return upsert(ctx)
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/core/resources/store/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package store

import (
"context"

"github.com/pkg/errors"
"go.uber.org/multierr"
)

type txCtx struct{}

func CtxWithTx(ctx context.Context, tx Transaction) context.Context {
return context.WithValue(ctx, txCtx{}, tx)
}

func TxFromCtx(ctx context.Context) (Transaction, bool) {
if value, ok := ctx.Value(txCtx{}).(Transaction); ok {
return value, true
}
return nil, false
}

type Transaction interface {
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

// Transactions is a producer of transactions if a resource store support transactions.
// Transactions are never required for consistency in Kuma, because there are ResourceStores that does not support transactions.
// However, in a couple of cases executing queries in transaction can improve the performance.
//
// In case of Postgres, you may set hooks when retrieve and release connections for the connection pool.
// In this case, if you create multiple resources, you need to acquire connection and execute hooks for each create.
// If you create transaction for it, you execute hooks once and you hold the connection.
//
// Transaction is propagated implicitly through Context.
type Transactions interface {
Begin(ctx context.Context) (Transaction, error)
lobkovilya marked this conversation as resolved.
Show resolved Hide resolved
}

func InTx(ctx context.Context, transactions Transactions, fn func(ctx context.Context) error) error {
tx, err := transactions.Begin(ctx)
if err != nil {
return err
}
if err := fn(CtxWithTx(ctx, tx)); err != nil {
if rollbackErr := tx.Rollback(ctx); rollbackErr != nil {
return multierr.Append(errors.Wrap(rollbackErr, "could not rollback transaction"), err)
}
return err
}
return tx.Commit(ctx)
}

type NoopTransaction struct{}

func (n NoopTransaction) Commit(context.Context) error {
return nil
}

func (n NoopTransaction) Rollback(context.Context) error {
return nil
}

var _ Transaction = &NoopTransaction{}

type NoTransactions struct{}

func (n NoTransactions) Begin(context.Context) (Transaction, error) {
return NoopTransaction{}, nil
}

var _ Transactions = &NoTransactions{}
15 changes: 15 additions & 0 deletions pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
type BuilderContext interface {
ComponentManager() component.Manager
ResourceStore() core_store.CustomizableResourceStore
Transactions() core_store.Transactions
SecretStore() store.SecretStore
ConfigStore() core_store.ResourceStore
ResourceManager() core_manager.CustomizableResourceManager
Expand Down Expand Up @@ -73,6 +74,7 @@ type Builder struct {
rs core_store.CustomizableResourceStore
ss store.SecretStore
cs core_store.ResourceStore
txs core_store.Transactions
rm core_manager.CustomizableResourceManager
rom core_manager.ReadOnlyResourceManager
gis globalinsight.GlobalInsightService
Expand Down Expand Up @@ -132,6 +134,11 @@ func (b *Builder) WithResourceStore(rs core_store.CustomizableResourceStore) *Bu
return b
}

func (b *Builder) WithTransactions(txs core_store.Transactions) *Builder {
b.txs = txs
return b
}

func (b *Builder) WithSecretStore(ss store.SecretStore) *Builder {
b.ss = ss
return b
Expand Down Expand Up @@ -294,6 +301,9 @@ func (b *Builder) Build() (Runtime, error) {
if b.rs == nil {
return nil, errors.Errorf("ResourceStore has not been configured")
}
if b.txs == nil {
return nil, errors.Errorf("Transactions has not been configured")
}
if b.rm == nil {
return nil, errors.Errorf("ResourceManager has not been configured")
}
Expand Down Expand Up @@ -367,6 +377,7 @@ func (b *Builder) Build() (Runtime, error) {
rm: b.rm,
rom: b.rom,
rs: b.rs,
txs: b.txs,
ss: b.ss,
cam: b.cam,
gis: b.gis,
Expand Down Expand Up @@ -407,6 +418,10 @@ func (b *Builder) ResourceStore() core_store.CustomizableResourceStore {
return b.rs
}

func (b *Builder) Transactions() core_store.Transactions {
return b.txs
}

func (b *Builder) SecretStore() store.SecretStore {
return b.ss
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RuntimeContext interface {
DataSourceLoader() datasource.Loader
ResourceManager() core_manager.ResourceManager
ResourceStore() core_store.ResourceStore
Transactions() core_store.Transactions
ReadOnlyResourceManager() core_manager.ReadOnlyResourceManager
SecretStore() store.SecretStore
ConfigStore() core_store.ResourceStore
Expand Down Expand Up @@ -147,6 +148,7 @@ type runtimeContext struct {
cfg kuma_cp.Config
rm core_manager.ResourceManager
rs core_store.ResourceStore
txs core_store.Transactions
ss store.SecretStore
cs core_store.ResourceStore
gis globalinsight.GlobalInsightService
Expand Down Expand Up @@ -206,6 +208,10 @@ func (rc *runtimeContext) ResourceStore() core_store.ResourceStore {
return rc.rs
}

func (rc *runtimeContext) Transactions() core_store.Transactions {
return rc.txs
}

func (rc *runtimeContext) SecretStore() store.SecretStore {
return rc.ss
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Setup(rt runtime.Runtime) error {
}

resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore())
resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Metrics(), rt.Extensions())
resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Transactions(), rt.Metrics(), rt.Extensions())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/global/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ var _ = Describe("Global Sync", func() {
globalStore = memory.NewStore()
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, metrics, context.Background())
globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, store.NoTransactions{}, metrics, context.Background())
Expect(err).ToNot(HaveOccurred())
stopCh := make(chan struct{})
clientStreams := []*grpc.MockDeltaClientStream{}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
},
rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10,
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions),
NewZonesInsightStore(
rt.ResourceManager(),
rt.Config().Store.Upsert,
rt.Config().Metrics.Zone.CompactFinishedSubscriptions,
rt.Transactions(),
),
l,
rt.Extensions(),
)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func NewZonesInsightStore(
resManager manager.ResourceManager,
upsertCfg config_store.UpsertConfig,
compactFinished bool,
transactions store.Transactions,
) ZoneInsightStore {
return &zoneInsightStore{
resManager: resManager,
upsertCfg: upsertCfg,
compactFinished: compactFinished,
transactions: transactions,
}
}

Expand All @@ -125,6 +127,7 @@ type zoneInsightStore struct {
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
compactFinished bool
transactions store.Transactions
}

func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription *system_proto.KDSSubscription) error {
Expand All @@ -142,5 +145,5 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription
zoneInsight.Spec.CompactFinished()
}
return nil
})
}, manager.WithTransactions(s.transactions))
}
2 changes: 1 addition & 1 deletion pkg/kds/v2/client/zone_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = Describe("Zone Delta Sync", func() {
zoneStore = memory.NewStore()
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
zoneSyncer, err = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, metrics, context.Background())
zoneSyncer, err = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, store.NoTransactions{}, metrics, context.Background())
Expect(err).ToNot(HaveOccurred())

wg.Add(1)
Expand Down
7 changes: 6 additions & 1 deletion pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration)
}, func() *time.Ticker {
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions())
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(
rt.ResourceManager(),
rt.Config().Store.Upsert,
rt.Config().Metrics.Zone.CompactFinishedSubscriptions,
rt.Transactions(),
), l, rt.Extensions())
}, log)
}

Expand Down
Loading