From a9125777c160c4526e1955d2f04a8ca48b2fadcd Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Tue, 23 Jul 2019 14:55:42 +0200 Subject: [PATCH] schema: replication strategy is now supported Two default CLI arguments "simple" and "network" gives the simplest possible default values of the respective strategy, SimpleStrategy and NetworkTopologyStrategy respectively. Alternatively the entire CQL specification (JSON-ish) can be sent in and gemini will then try to interpret it directly. --- cmd/gemini/root.go | 19 ++++++++++++ cmd/gemini/schema.go | 55 +++++++++++++++++---------------- replication/replication.go | 27 ++++++++++++++++ replication/replication_test.go | 29 +++++++++++++++++ schema.go | 38 +++++++++++++---------- 5 files changed, 125 insertions(+), 43 deletions(-) create mode 100644 replication/replication.go create mode 100644 replication/replication_test.go diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index d4c4eab0..89c05746 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/scylladb/gemini" + "github.com/scylladb/gemini/replication" "github.com/scylladb/gemini/store" "github.com/spf13/cobra" "go.uber.org/zap" @@ -43,6 +44,7 @@ var ( bind string warmup time.Duration compactionStrategy string + replicationStrategy string consistency string maxPartitionKeys int minPartitionKeys int @@ -286,6 +288,22 @@ func getCompactionStrategy(cs string, logger *zap.Logger) *gemini.CompactionStra } } +func getReplicationStrategy(rs string, fallback *replication.Replication, logger *zap.Logger) *replication.Replication { + switch rs { + case "network": + return replication.NewNetworkTopologyStrategy() + case "simple": + return replication.NewSimpleStrategy() + default: + replicationStrategy := &replication.Replication{} + if err := json.Unmarshal([]byte(strings.ReplaceAll(rs, "'", "\"")), rs); err != nil { + logger.Error("unable to parse replication strategy", zap.String("strategy", rs), zap.Error(err)) + return fallback + } + return replicationStrategy + } +} + func getCQLFeature(feature string) gemini.CQLFeature { switch strings.ToLower(feature) { case "all": @@ -324,6 +342,7 @@ func init() { rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'") rootCmd.Flags().DurationVarP(&warmup, "warmup", "", 30*time.Second, "Specify the warmup perid as a duration for example 30s or 10h") rootCmd.Flags().StringVarP(&compactionStrategy, "compaction-strategy", "", "", "Specify the desired CS as either the coded short hand stcs|twcs|lcs to get the default for each type or provide the entire specification in the form {'class':'....'}") + rootCmd.Flags().StringVarP(&replicationStrategy, "replication-strategy", "", "", "Specify the desired replication strategy as either the coded short hand simple|network to get the default for each type or provide the entire specification in the form {'class':'....'}") rootCmd.Flags().StringVarP(&consistency, "consistency", "", "QUORUM", "Specify the desired consistency as ANY|ONE|TWO|THREE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|LOCAL_ONE") rootCmd.Flags().IntVarP(&maxPartitionKeys, "max-partition-keys", "", 6, "Maximum number of generated partition keys") rootCmd.Flags().IntVarP(&minPartitionKeys, "min-partition-keys", "", 2, "Minimum number of generated partition keys") diff --git a/cmd/gemini/schema.go b/cmd/gemini/schema.go index 466bc416..14120454 100644 --- a/cmd/gemini/schema.go +++ b/cmd/gemini/schema.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/scylladb/gemini" + "github.com/scylladb/gemini/replication" "go.uber.org/zap" ) @@ -12,18 +13,19 @@ func createSchemaConfig(logger *zap.Logger) gemini.SchemaConfig { switch strings.ToLower(datasetSize) { case "small": return gemini.SchemaConfig{ - CompactionStrategy: defaultConfig.CompactionStrategy, - MaxPartitionKeys: defaultConfig.MaxPartitionKeys, - MinPartitionKeys: defaultConfig.MinPartitionKeys, - MaxClusteringKeys: defaultConfig.MaxClusteringKeys, - MinClusteringKeys: defaultConfig.MinClusteringKeys, - MaxColumns: defaultConfig.MaxColumns, - MinColumns: defaultConfig.MinColumns, - MaxUDTParts: 2, - MaxTupleParts: 2, - MaxBlobLength: 20, - MaxStringLength: 20, - CQLFeature: defaultConfig.CQLFeature, + CompactionStrategy: defaultConfig.CompactionStrategy, + ReplicationStrategy: defaultConfig.ReplicationStrategy, + MaxPartitionKeys: defaultConfig.MaxPartitionKeys, + MinPartitionKeys: defaultConfig.MinPartitionKeys, + MaxClusteringKeys: defaultConfig.MaxClusteringKeys, + MinClusteringKeys: defaultConfig.MinClusteringKeys, + MaxColumns: defaultConfig.MaxColumns, + MinColumns: defaultConfig.MinColumns, + MaxUDTParts: 2, + MaxTupleParts: 2, + MaxBlobLength: 20, + MaxStringLength: 20, + CQLFeature: defaultConfig.CQLFeature, } default: return defaultConfig @@ -40,19 +42,20 @@ func createDefaultSchemaConfig(logger *zap.Logger) gemini.SchemaConfig { MaxUDTParts = 20 ) return gemini.SchemaConfig{ - CompactionStrategy: getCompactionStrategy(compactionStrategy, logger), - MaxPartitionKeys: maxPartitionKeys, - MinPartitionKeys: minPartitionKeys, - MaxClusteringKeys: maxClusteringKeys, - MinClusteringKeys: minClusteringKeys, - MaxColumns: maxColumns, - MinColumns: minColumns, - MaxUDTParts: MaxUDTParts, - MaxTupleParts: MaxTupleParts, - MaxBlobLength: MaxBlobLength, - MinBlobLength: MinBlobLength, - MaxStringLength: MaxStringLength, - MinStringLength: MinStringLength, - CQLFeature: getCQLFeature(cqlFeatures), + CompactionStrategy: getCompactionStrategy(compactionStrategy, logger), + ReplicationStrategy: getReplicationStrategy(replicationStrategy, replication.NewSimpleStrategy(), logger), + MaxPartitionKeys: maxPartitionKeys, + MinPartitionKeys: minPartitionKeys, + MaxClusteringKeys: maxClusteringKeys, + MinClusteringKeys: minClusteringKeys, + MaxColumns: maxColumns, + MinColumns: minColumns, + MaxUDTParts: MaxUDTParts, + MaxTupleParts: MaxTupleParts, + MaxBlobLength: MaxBlobLength, + MinBlobLength: MinBlobLength, + MaxStringLength: MaxStringLength, + MinStringLength: MinStringLength, + CQLFeature: getCQLFeature(cqlFeatures), } } diff --git a/replication/replication.go b/replication/replication.go new file mode 100644 index 00000000..aebc92ea --- /dev/null +++ b/replication/replication.go @@ -0,0 +1,27 @@ +package replication + +import ( + "encoding/json" + "strings" +) + +type Replication map[string]interface{} + +func (r *Replication) ToCQL() string { + b, _ := json.Marshal(r) + return strings.ReplaceAll(string(b), "\"", "'") +} + +func NewSimpleStrategy() *Replication { + return &Replication{ + "class": "SimpleStrategy", + "replication_factor": 1, + } +} + +func NewNetworkTopologyStrategy() *Replication { + return &Replication{ + "class": "NetworkTopologyStrategy", + "datacenter1": 1, + } +} diff --git a/replication/replication_test.go b/replication/replication_test.go new file mode 100644 index 00000000..8a7da2e0 --- /dev/null +++ b/replication/replication_test.go @@ -0,0 +1,29 @@ +package replication + +import ( + "testing" +) + +func TestToCQL(t *testing.T) { + tests := map[string]struct { + rs *Replication + want string + }{ + "simple": { + rs: NewSimpleStrategy(), + want: "{'class':'SimpleStrategy','replication_factor':1}", + }, + "network": { + rs: NewNetworkTopologyStrategy(), + want: "{'class':'NetworkTopologyStrategy','datacenter1':1}", + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + got := test.rs.ToCQL() + if got != test.want { + t.Fatalf("expected '%s', got '%s'", test.want, got) + } + }) + } +} diff --git a/schema.go b/schema.go index 1e764c75..e9d60d64 100644 --- a/schema.go +++ b/schema.go @@ -9,6 +9,7 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/scylladb/gemini/replication" "github.com/scylladb/gocqlx/qb" "golang.org/x/exp/rand" ) @@ -26,20 +27,21 @@ const ( type Value []interface{} type SchemaConfig struct { - CompactionStrategy *CompactionStrategy - MaxPartitionKeys int - MinPartitionKeys int - MaxClusteringKeys int - MinClusteringKeys int - MaxColumns int - MinColumns int - MaxUDTParts int - MaxTupleParts int - MaxBlobLength int - MaxStringLength int - MinBlobLength int - MinStringLength int - CQLFeature CQLFeature + CompactionStrategy *CompactionStrategy + ReplicationStrategy *replication.Replication + MaxPartitionKeys int + MinPartitionKeys int + MaxClusteringKeys int + MinClusteringKeys int + MaxColumns int + MinColumns int + MaxUDTParts int + MaxTupleParts int + MaxBlobLength int + MaxStringLength int + MinBlobLength int + MinStringLength int + CQLFeature CQLFeature } var ( @@ -86,7 +88,8 @@ func (sc *SchemaConfig) GetMinColumns() int { } type Keyspace struct { - Name string `json:"name"` + Name string `json:"name"` + Replication *replication.Replication `json:"replication"` } type ColumnDef struct { @@ -364,7 +367,8 @@ func (s *Schema) GetDropSchema() []string { func GenSchema(sc SchemaConfig) *Schema { builder := NewSchemaBuilder() keyspace := Keyspace{ - Name: "ks1", + Name: "ks1", + Replication: sc.ReplicationStrategy, } builder.Keyspace(keyspace) var partitionKeys []ColumnDef @@ -490,7 +494,7 @@ func randomCompactionStrategy() *CompactionStrategy { } func (s *Schema) GetCreateSchema() []string { - createKeyspace := fmt.Sprintf("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}", s.Keyspace.Name) + createKeyspace := fmt.Sprintf("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s", s.Keyspace.Name, s.Keyspace.Replication.ToCQL()) stmts := []string{createKeyspace}