Skip to content

Commit

Permalink
perf(store): postgres transactions (#7995)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz authored Oct 11, 2023
1 parent 386ab53 commit 31fdd6e
Show file tree
Hide file tree
Showing 21 changed files with 279 additions and 92 deletions.
3 changes: 2 additions & 1 deletion pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,12 @@ func initializeResourceStore(cfg kuma_cp.Config, builder *core_runtime.Builder)
return errors.Wrapf(err, "could not retrieve store %s plugin", pluginName)
}

rs, err := plugin.NewResourceStore(builder, pluginConfig)
rs, transactions, err := plugin.NewResourceStore(builder, pluginConfig)
if err != nil {
return err
}
builder.WithResourceStore(core_store.NewCustomizableResourceStore(rs))
builder.WithTransactions(transactions)
eventBus, err := events.NewEventBus(cfg.EventBus.BufferSize, builder.Metrics())
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/plugins/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type (
DbVersion = uint
ResourceStorePlugin interface {
Plugin
NewResourceStore(PluginContext, PluginConfig) (core_store.ResourceStore, error)
NewResourceStore(PluginContext, PluginConfig) (core_store.ResourceStore, core_store.Transactions, error)
Migrate(PluginContext, PluginConfig) (DbVersion, error)
EventListener(PluginContext, events.Emitter) error
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/core/resources/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type ConflictRetry struct {

type UpsertOpts struct {
ConflictRetry ConflictRetry
Transactions store.Transactions
}

type UpsertFunc func(opts *UpsertOpts)
Expand All @@ -119,8 +120,16 @@ func WithConflictRetry(baseBackoff time.Duration, maxTimes uint, jitterPercent u
}
}

func WithTransactions(transactions store.Transactions) UpsertFunc {
return func(opts *UpsertOpts) {
opts.Transactions = transactions
}
}

func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
opts := UpsertOpts{}
opts := UpsertOpts{
Transactions: store.NoTransactions{},
}
for _, f := range fs {
f(&opts)
}
Expand All @@ -130,30 +139,32 @@ func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
var ErrSkipUpsert = errors.New("don't do upsert")

func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
opts := NewUpsertOpts(fs...)
upsert := func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
} else {
return store.InTx(ctx, opts.Transactions, func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
} else {
return err
}
}
if err := fn(resource); err != nil {
if err == ErrSkipUpsert { // Way to skip inserts when there are no change
return nil
}
return err
}
}
if err := fn(resource); err != nil {
if err == ErrSkipUpsert { // Way to skip inserts when there are no change
return nil
if create {
return manager.Create(ctx, resource, store.CreateBy(key))
} else {
return manager.Update(ctx, resource)
}
return err
}
if create {
return manager.Create(ctx, resource, store.CreateBy(key))
} else {
return manager.Update(ctx, resource)
}
})
}

opts := NewUpsertOpts(fs...)
if opts.ConflictRetry.BaseBackoff <= 0 || opts.ConflictRetry.MaxTimes == 0 {
return upsert(ctx)
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/core/resources/store/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package store

import (
"context"

"github.com/pkg/errors"
"go.uber.org/multierr"
)

type txCtx struct{}

func CtxWithTx(ctx context.Context, tx Transaction) context.Context {
return context.WithValue(ctx, txCtx{}, tx)
}

func TxFromCtx(ctx context.Context) (Transaction, bool) {
if value, ok := ctx.Value(txCtx{}).(Transaction); ok {
return value, true
}
return nil, false
}

type Transaction interface {
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

// Transactions is a producer of transactions if a resource store support transactions.
// Transactions are never required for consistency in Kuma, because there are ResourceStores that does not support transactions.
// However, in a couple of cases executing queries in transaction can improve the performance.
//
// In case of Postgres, you may set hooks when retrieve and release connections for the connection pool.
// In this case, if you create multiple resources, you need to acquire connection and execute hooks for each create.
// If you create transaction for it, you execute hooks once and you hold the connection.
//
// Transaction is propagated implicitly through Context.
type Transactions interface {
Begin(ctx context.Context) (Transaction, error)
}

func InTx(ctx context.Context, transactions Transactions, fn func(ctx context.Context) error) error {
tx, err := transactions.Begin(ctx)
if err != nil {
return err
}
if err := fn(CtxWithTx(ctx, tx)); err != nil {
if rollbackErr := tx.Rollback(ctx); rollbackErr != nil {
return multierr.Append(errors.Wrap(rollbackErr, "could not rollback transaction"), err)
}
return err
}
return tx.Commit(ctx)
}

type NoopTransaction struct{}

func (n NoopTransaction) Commit(context.Context) error {
return nil
}

func (n NoopTransaction) Rollback(context.Context) error {
return nil
}

var _ Transaction = &NoopTransaction{}

type NoTransactions struct{}

func (n NoTransactions) Begin(context.Context) (Transaction, error) {
return NoopTransaction{}, nil
}

var _ Transactions = &NoTransactions{}
15 changes: 15 additions & 0 deletions pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
type BuilderContext interface {
ComponentManager() component.Manager
ResourceStore() core_store.CustomizableResourceStore
Transactions() core_store.Transactions
SecretStore() store.SecretStore
ConfigStore() core_store.ResourceStore
ResourceManager() core_manager.CustomizableResourceManager
Expand Down Expand Up @@ -73,6 +74,7 @@ type Builder struct {
rs core_store.CustomizableResourceStore
ss store.SecretStore
cs core_store.ResourceStore
txs core_store.Transactions
rm core_manager.CustomizableResourceManager
rom core_manager.ReadOnlyResourceManager
gis globalinsight.GlobalInsightService
Expand Down Expand Up @@ -132,6 +134,11 @@ func (b *Builder) WithResourceStore(rs core_store.CustomizableResourceStore) *Bu
return b
}

func (b *Builder) WithTransactions(txs core_store.Transactions) *Builder {
b.txs = txs
return b
}

func (b *Builder) WithSecretStore(ss store.SecretStore) *Builder {
b.ss = ss
return b
Expand Down Expand Up @@ -294,6 +301,9 @@ func (b *Builder) Build() (Runtime, error) {
if b.rs == nil {
return nil, errors.Errorf("ResourceStore has not been configured")
}
if b.txs == nil {
return nil, errors.Errorf("Transactions has not been configured")
}
if b.rm == nil {
return nil, errors.Errorf("ResourceManager has not been configured")
}
Expand Down Expand Up @@ -367,6 +377,7 @@ func (b *Builder) Build() (Runtime, error) {
rm: b.rm,
rom: b.rom,
rs: b.rs,
txs: b.txs,
ss: b.ss,
cam: b.cam,
gis: b.gis,
Expand Down Expand Up @@ -407,6 +418,10 @@ func (b *Builder) ResourceStore() core_store.CustomizableResourceStore {
return b.rs
}

func (b *Builder) Transactions() core_store.Transactions {
return b.txs
}

func (b *Builder) SecretStore() store.SecretStore {
return b.ss
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RuntimeContext interface {
DataSourceLoader() datasource.Loader
ResourceManager() core_manager.ResourceManager
ResourceStore() core_store.ResourceStore
Transactions() core_store.Transactions
ReadOnlyResourceManager() core_manager.ReadOnlyResourceManager
SecretStore() store.SecretStore
ConfigStore() core_store.ResourceStore
Expand Down Expand Up @@ -147,6 +148,7 @@ type runtimeContext struct {
cfg kuma_cp.Config
rm core_manager.ResourceManager
rs core_store.ResourceStore
txs core_store.Transactions
ss store.SecretStore
cs core_store.ResourceStore
gis globalinsight.GlobalInsightService
Expand Down Expand Up @@ -206,6 +208,10 @@ func (rc *runtimeContext) ResourceStore() core_store.ResourceStore {
return rc.rs
}

func (rc *runtimeContext) Transactions() core_store.Transactions {
return rc.txs
}

func (rc *runtimeContext) SecretStore() store.SecretStore {
return rc.ss
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Setup(rt runtime.Runtime) error {
}

resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore())
resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Metrics(), rt.Extensions())
resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Transactions(), rt.Metrics(), rt.Extensions())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/global/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ var _ = Describe("Global Sync", func() {
globalStore = memory.NewStore()
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, metrics, context.Background())
globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, store.NoTransactions{}, metrics, context.Background())
Expect(err).ToNot(HaveOccurred())
stopCh := make(chan struct{})
clientStreams := []*grpc.MockDeltaClientStream{}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
},
rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10,
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions),
NewZonesInsightStore(
rt.ResourceManager(),
rt.Config().Store.Upsert,
rt.Config().Metrics.Zone.CompactFinishedSubscriptions,
rt.Transactions(),
),
l,
rt.Extensions(),
)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func NewZonesInsightStore(
resManager manager.ResourceManager,
upsertCfg config_store.UpsertConfig,
compactFinished bool,
transactions store.Transactions,
) ZoneInsightStore {
return &zoneInsightStore{
resManager: resManager,
upsertCfg: upsertCfg,
compactFinished: compactFinished,
transactions: transactions,
}
}

Expand All @@ -125,6 +127,7 @@ type zoneInsightStore struct {
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
compactFinished bool
transactions store.Transactions
}

func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription *system_proto.KDSSubscription) error {
Expand All @@ -142,5 +145,5 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription
zoneInsight.Spec.CompactFinished()
}
return nil
})
}, manager.WithTransactions(s.transactions))
}
2 changes: 1 addition & 1 deletion pkg/kds/v2/client/zone_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = Describe("Zone Delta Sync", func() {
zoneStore = memory.NewStore()
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
zoneSyncer, err = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, metrics, context.Background())
zoneSyncer, err = sync_store_v2.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore, store.NoTransactions{}, metrics, context.Background())
Expect(err).ToNot(HaveOccurred())

wg.Add(1)
Expand Down
7 changes: 6 additions & 1 deletion pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration)
}, func() *time.Ticker {
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions())
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(
rt.ResourceManager(),
rt.Config().Store.Upsert,
rt.Config().Metrics.Zone.CompactFinishedSubscriptions,
rt.Transactions(),
), l, rt.Extensions())
}, log)
}

Expand Down
Loading

0 comments on commit 31fdd6e

Please sign in to comment.