diff --git a/go.mod b/go.mod index 061b0052e..f35ac42c5 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7 github.com/dop251/goja_nodejs v0.0.0-20230602164024-804a84515562 github.com/gammazero/deque v0.2.1 + github.com/goccy/go-json v0.10.2 github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.3.1 diff --git a/go.sum b/go.sum index 30f143b75..2fb5c8aa6 100644 --- a/go.sum +++ b/go.sum @@ -262,6 +262,8 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyL github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= diff --git a/pkg/connector/persister.go b/pkg/connector/persister.go index 75b989c8a..2db7c9e3a 100644 --- a/pkg/connector/persister.go +++ b/pkg/connector/persister.go @@ -26,7 +26,7 @@ import ( const ( DefaultPersisterDelayThreshold = time.Second - DefaultPersisterBundleCountThreshold = 100 + DefaultPersisterBundleCountThreshold = 10000 ) // Persister is responsible for persisting connectors and their state when diff --git a/pkg/connector/store.go b/pkg/connector/store.go index 3b88d27da..d7b5781ad 100644 --- a/pkg/connector/store.go +++ b/pkg/connector/store.go @@ -16,13 +16,13 @@ package connector import ( "context" - "encoding/json" "strings" "time" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/database" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/goccy/go-json" ) const ( @@ -148,13 +148,30 @@ func (s *Store) PrepareSet(id string, instance *Instance) (func(context.Context) return nil, cerrors.Errorf("can't store connector: %w", cerrors.ErrEmptyID) } - raw, err := s.encode(instance) - if err != nil { - return nil, err + icopy := Instance{ + ID: instance.ID, + Type: instance.Type, + Config: Config{ + Name: instance.Config.Name, + Settings: instance.Config.Settings, + }, + PipelineID: instance.PipelineID, + Plugin: instance.Plugin, + ProcessorIDs: instance.ProcessorIDs, + ProvisionedBy: instance.ProvisionedBy, + State: instance.State, + CreatedAt: instance.CreatedAt, + UpdatedAt: instance.UpdatedAt, + LastActiveConfig: instance.LastActiveConfig, } - key := s.addKeyPrefix(id) return func(ctx context.Context) error { + raw, err := s.encode(&icopy) + if err != nil { + return err + } + key := s.addKeyPrefix(id) + err = s.db.Set(ctx, key, raw) if err != nil { return cerrors.Errorf("failed to store connector with ID %q: %w", id, err)