Skip to content

Commit

Permalink
Schema Tool option to configure request timeout for cql client (#900)
Browse files Browse the repository at this point in the history
fixes #897
  • Loading branch information
samarabbas authored Jun 27, 2018
1 parent f11fff3 commit d3e8f10
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 17 deletions.
2 changes: 1 addition & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations
// CreateMatchingRetryPolicy creates a retry policy for calls to matching service
func CreateMatchingRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(matchingServiceOperationInitialInterval)
policy.SetMaximumInterval(matchingServiceOperationMaxInterval)
Expand Down
3 changes: 3 additions & 0 deletions tools/cassandra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
CassUser string
CassPassword string
CassKeyspace string
CassTimeout int
}

// UpdateSchemaConfig holds the config
Expand Down Expand Up @@ -76,6 +77,7 @@ const (
cliOptPort = "port"
cliOptUser = "user"
cliOptPassword = "password"
cliOptTimeout = "timeout"
cliOptKeyspace = "keyspace"
cliOptVersion = "version"
cliOptSchemaFile = "schema-file"
Expand All @@ -91,6 +93,7 @@ const (
cliFlagPort = cliOptPort + ", p"
cliFlagUser = cliOptUser + ", u"
cliFlagPassword = cliOptPassword + ", pw"
cliFlagTimeout = cliOptTimeout + ", t"
cliFlagKeyspace = cliOptKeyspace + ", k"
cliFlagVersion = cliOptVersion + ", v"
cliFlagSchemaFile = cliOptSchemaFile + ", f"
Expand Down
8 changes: 5 additions & 3 deletions tools/cassandra/cqlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var errGetSchemaVersion = errors.New("Failed to get current schema version from

const (
newLineDelim = '\n'
defaultTimeout = 30 * time.Second
defaultTimeout = 30 // timeout in seconds
cqlProtoVersion = 4 // default CQL protocol version
defaultConsistency = "QUORUM" // schema updates must always be QUORUM
defaultCassandraPort = 9042
Expand Down Expand Up @@ -107,7 +107,8 @@ const (
)

// newCQLClient returns a new instance of CQLClient
func newCQLClient(hostsCsv string, port int, user, password, keyspace string) (CQLClient, error) {
func newCQLClient(hostsCsv string, port int, user, password, keyspace string, timeoutSeconds int) (CQLClient,
error) {
hosts := parseHosts(hostsCsv)
if len(hosts) == 0 {
return nil, errNoHosts
Expand All @@ -122,8 +123,9 @@ func newCQLClient(hostsCsv string, port int, user, password, keyspace string) (C
Password: password,
}
}
timeout := time.Duration(timeoutSeconds) * time.Second
clusterCfg.Keyspace = keyspace
clusterCfg.Timeout = defaultTimeout
clusterCfg.Timeout = timeout
clusterCfg.ProtoVersion = cqlProtoVersion
clusterCfg.Consistency = gocql.ParseConsistency(defaultConsistency)
cqlClient := new(cqlClient)
Expand Down
4 changes: 2 additions & 2 deletions tools/cassandra/cqlclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *CQLClientTestSuite) SetupSuite() {
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
s.keyspace = fmt.Sprintf("cql_client_test_%v", rand.Int63())

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system")
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system", defaultTimeout)
if err != nil {
log.Fatalf("error creating CQLClient, err=%v", err)
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *CQLClientTestSuite) testCreate(client CQLClient) {
}

func (s *CQLClientTestSuite) TestCQLClient() {
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace)
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace, defaultTimeout)
s.Nil(err)
s.testCreate(client)
s.testUpdate(client)
Expand Down
6 changes: 5 additions & 1 deletion tools/cassandra/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func createKeyspace(cli *cli.Context) error {
if err != nil {
return handleErr(err)
}
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, "system")
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, "system",
config.CassTimeout)
if err != nil {
return handleErr(fmt.Errorf("error creating cql client:%v", err))
}
Expand Down Expand Up @@ -127,6 +128,7 @@ func newSetupSchemaConfig(cli *cli.Context) (*SetupSchemaConfig, error) {
config.CassPort = cli.GlobalInt(cliOptPort)
config.CassUser = cli.GlobalString(cliOptUser)
config.CassPassword = cli.GlobalString(cliOptPassword)
config.CassTimeout = cli.GlobalInt(cliOptTimeout)
config.CassKeyspace = cli.GlobalString(cliOptKeyspace)
config.SchemaFilePath = cli.String(cliOptSchemaFile)
config.InitialVersion = cli.String(cliOptVersion)
Expand Down Expand Up @@ -171,6 +173,7 @@ func newUpdateSchemaConfig(cli *cli.Context) (*UpdateSchemaConfig, error) {
config.CassPort = cli.GlobalInt(cliOptPort)
config.CassUser = cli.GlobalString(cliOptUser)
config.CassPassword = cli.GlobalString(cliOptPassword)
config.CassTimeout = cli.GlobalInt(cliOptTimeout)
config.CassKeyspace = cli.GlobalString(cliOptKeyspace)
config.SchemaDir = cli.String(cliOptSchemaDir)
config.IsDryRun = cli.Bool(cliOptDryrun)
Expand All @@ -189,6 +192,7 @@ func newCreateKeyspaceConfig(cli *cli.Context) (*CreateKeyspaceConfig, error) {
config.CassPort = cli.GlobalInt(cliOptPort)
config.CassUser = cli.GlobalString(cliOptUser)
config.CassPassword = cli.GlobalString(cliOptPassword)
config.CassTimeout = cli.GlobalInt(cliOptTimeout)
config.CassKeyspace = cli.String(cliOptKeyspace)
config.ReplicationFactor = cli.Int(cliOptReplicationFactor)

Expand Down
6 changes: 6 additions & 0 deletions tools/cassandra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func buildCLIOptions() *cli.App {
Usage: "password used for authentication for connecting to cassandra host",
EnvVar: "CASSANDRA_PASSWORD",
},
cli.IntFlag{
Name: cliFlagTimeout,
Value: defaultTimeout,
Usage: "request timeout in seconds used for cql client",
EnvVar: "CASSANDRA_TIMEOUT",
},
cli.StringFlag{
Name: cliFlagKeyspace,
Value: "cadence",
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/setupTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SetupSchemaTask struct {

func newSetupSchemaTask(config *SetupSchemaConfig) (*SetupSchemaTask, error) {
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword,
config.CassKeyspace)
config.CassKeyspace, config.CassTimeout)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tools/cassandra/setupTask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *SetupSchemaTestSuite) SetupSuite() {
s.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
s.keyspace = fmt.Sprintf("setup_schema_test_%v", s.rand.Int63())

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system")
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system", defaultTimeout)
if err != nil {
s.log.Fatal("Error creating CQLClient")
}
Expand All @@ -87,7 +87,7 @@ func (s *SetupSchemaTestSuite) TestCreateKeyspace() {

func (s *SetupSchemaTestSuite) TestSetupSchema() {

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace)
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace, defaultTimeout)
s.Nil(err)

// test command fails without required arguments
Expand Down
6 changes: 4 additions & 2 deletions tools/cassandra/updateTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func NewUpdateSchemaTask(config *UpdateSchemaConfig) (*UpdateSchemaTask, error)
}
}

client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, keyspace)
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, keyspace,
config.CassTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -357,7 +358,8 @@ func readSchemaDir(dir string, startVer string, endVer string) ([]string, error)
// sets up a temporary dryrun keyspace for
// executing the cassandra schema update
func setupDryrunKeyspace(config *UpdateSchemaConfig) error {
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, systemKeyspace)
client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, systemKeyspace,
config.CassTimeout)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions tools/cassandra/updateTask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *UpdateSchemaTestSuite) SetupSuite() {
s.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
s.keyspace = fmt.Sprintf("update_schema_test_%v", s.rand.Int63())

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system")
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system", defaultTimeout)
if err != nil {
s.log.Fatal("Error creating CQLClient")
}
Expand All @@ -81,7 +81,7 @@ func (s *UpdateSchemaTestSuite) TearDownSuite() {

func (s *UpdateSchemaTestSuite) TestUpdateSchema() {

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace)
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace, defaultTimeout)
s.Nil(err)
defer client.Close()

Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *UpdateSchemaTestSuite) TestUpdateSchema() {

func (s *UpdateSchemaTestSuite) TestDryrun() {

client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace)
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", s.keyspace, defaultTimeout)
s.Nil(err)
defer client.Close()

Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func VerifyCompatibleVersion(cfg config.Cassandra, rootPath string) error {

// checkCompatibleVersion check the version compatibility
func checkCompatibleVersion(cfg config.Cassandra, keyspace string, dirPath string) error {
cqlClient, err := newCQLClient(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, keyspace)
cqlClient, err := newCQLClient(cfg.Hosts, cfg.Port, cfg.User, cfg.Password, keyspace, defaultTimeout)
if err != nil {
return fmt.Errorf("unable to create CQL Client: %v", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *VersionTestSuite) TestCheckCompatibleVersion() {
}

func (s *VersionTestSuite) createKeyspace(keyspace string) func() {
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system")
client, err := newCQLClient("127.0.0.1", defaultCassandraPort, "", "", "system", defaultTimeout)
s.NoError(err)

err = client.CreateKeyspace(keyspace, 1)
Expand Down

0 comments on commit d3e8f10

Please sign in to comment.