Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated connection setters to include connWriteTimeout #21

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions cmd/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var loadCmd = &cobra.Command{
concurrency, _ := pflags.GetInt("concurrency")
requests, _ := pflags.GetInt("requests")
debugLevel, _ := pflags.GetInt("debug")
cmdTimeout, _ := pflags.GetInt64(redis.REDIS_COMMAND_TIMEOUT)
connWriteTimeout := time.Duration(cmdTimeout) * time.Millisecond

validateDB(db)
log.Printf("Using %d concurrent workers to ingest datapoints", concurrency)
Expand Down Expand Up @@ -110,10 +112,10 @@ var loadCmd = &cobra.Command{
uri, _ := pflags.GetString(redis.REDIS_URI_PROPERTY)
password, _ := pflags.GetString(redis.REDIS_PASSWORD_PROPERTY)
if strings.Compare(inputType, INPUT_TYPE_GEOSHAPE) == 0 {
setupStageGeoShape(uri, password, db, indexSearch, indexSearchName, INDEX_FIELDNAME_GEOSHAPE)
setupStageGeoShape(uri, password, db, indexSearch, indexSearchName, INDEX_FIELDNAME_GEOSHAPE, connWriteTimeout)
// geopoint
} else {
setupStageGeoPoint(uri, password, db, indexSearch, indexSearchName, INDEX_FIELDNAME_GEOPOINT)
setupStageGeoPoint(uri, password, db, indexSearch, indexSearchName, INDEX_FIELDNAME_GEOPOINT, connWriteTimeout)
}
}

Expand All @@ -139,12 +141,12 @@ var loadCmd = &cobra.Command{
go loadWorkerGeoshapeElastic(elasticWrapper, workQueue, complete, &issuedCommands, &finishedCommands, &activeConns, datapointsChan, uint64(nDatapoints), INDEX_FIELDNAME_GEOSHAPE)
} else {
uri, _ := pflags.GetString(redis.REDIS_URI_PROPERTY)
go loadWorkerGeoshape(uri, password, workQueue, complete, &finishedCommands, datapointsChan, uint64(nDatapoints), db, INDEX_FIELDNAME_GEOSHAPE, debugLevel)
go loadWorkerGeoshape(uri, password, workQueue, complete, &finishedCommands, datapointsChan, uint64(nDatapoints), db, INDEX_FIELDNAME_GEOSHAPE, debugLevel, connWriteTimeout)
}
// geopoint
} else {
uri, _ := pflags.GetString(redis.REDIS_URI_PROPERTY)
go loadWorkerGeopoint(uri, password, workQueue, complete, &finishedCommands, datapointsChan, uint64(nDatapoints), db, redisGeoKeyname, INDEX_FIELDNAME_GEOPOINT)
go loadWorkerGeopoint(uri, password, workQueue, complete, &finishedCommands, datapointsChan, uint64(nDatapoints), db, redisGeoKeyname, INDEX_FIELDNAME_GEOPOINT, connWriteTimeout)
}

// delay the creation 1ms for each additional client
Expand Down Expand Up @@ -189,12 +191,13 @@ func validateDB(db string) {
}
}

func setupStageGeoShape(uri, password, db string, indexSearch bool, indexName, fieldName string) {
func setupStageGeoShape(uri, password, db string, indexSearch bool, indexName, fieldName string, connWriteTimeout time.Duration) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
defer c.Close()
ctx := context.Background()
Expand Down Expand Up @@ -226,12 +229,13 @@ func setupStageGeoShape(uri, password, db string, indexSearch bool, indexName, f
log.Printf("Finished setup stage for %s DB\n", db)
}

func setupStageGeoPoint(uri, password, db string, indexSearch bool, indexName, fieldname string) {
func setupStageGeoPoint(uri, password, db string, indexSearch bool, indexName, fieldname string, connWriteTimeout time.Duration) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
defer c.Close()
ctx := context.Background()
Expand Down Expand Up @@ -384,12 +388,13 @@ func LineCounter(r io.Reader) (int, error) {
return count, nil
}

func loadWorkerGeopoint(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, redisGeoKeyname string, fieldName string) {
func loadWorkerGeopoint(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, redisGeoKeyname string, fieldName string, connWriteTimeout time.Duration) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -470,12 +475,13 @@ func queryWorkerGeoshapeElastic(ec *elastic.ElasticWrapper, queue chan string, c
complete <- true
}

func loadWorkerGeoshape(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, fieldName string, debugLevel int) {
func loadWorkerGeoshape(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, fieldName string, debugLevel int, connWriteTimeout time.Duration) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
if err != nil {
panic(err)
Expand Down
27 changes: 15 additions & 12 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var queryCmd = &cobra.Command{
seed, _ := pflags.GetInt("random.seed")
debugLevel, _ := pflags.GetInt("debug")
queryTimeout, _ := pflags.GetInt64(redis.REDIS_COMMAND_TIMEOUT)
connWriteTimeout := time.Duration(queryTimeout) * time.Millisecond
redisGeoKeyname, _ := pflags.GetString(REDIS_GEO_KEYNAME_PROPERTY)
indexSearchName, _ := pflags.GetString(redis.REDIS_IDX_NAME_PROPERTY)
password, _ := pflags.GetString(redis.REDIS_PASSWORD_PROPERTY)
Expand Down Expand Up @@ -126,11 +127,11 @@ var queryCmd = &cobra.Command{
}
go queryWorkerGeoshapeElastic(elasticWrapper, workQueue, complete, &issuedCommands, &finishedCommands, &activeConns, datapointsChan, uint64(nDatapoints), queryType, INDEX_FIELDNAME_GEOSHAPE, debugLevel)
} else {
go queryWorkerGeoShape(uri, password, workQueue, complete, &issuedCommands, datapointsChan, uint64(nDatapoints), db, indexSearchName, INDEX_FIELDNAME_GEOSHAPE, queryType, testTime, queryTimeout, debugLevel)
go queryWorkerGeoShape(uri, password, workQueue, complete, &issuedCommands, datapointsChan, uint64(nDatapoints), db, indexSearchName, INDEX_FIELDNAME_GEOSHAPE, queryType, testTime, queryTimeout, debugLevel, connWriteTimeout)
}
// geopoint
} else {
go queryWorkerGeoPoint(uri, password, workQueue, complete, &issuedCommands, datapointsChan, uint64(nDatapoints), db, mu, r, redisGeoKeyname, indexSearchName, INDEX_FIELDNAME_GEOPOINT, testTime)
go queryWorkerGeoPoint(uri, password, workQueue, complete, &issuedCommands, datapointsChan, uint64(nDatapoints), db, mu, r, redisGeoKeyname, indexSearchName, INDEX_FIELDNAME_GEOPOINT, testTime, connWriteTimeout)
}
// delay the creation 1ms for each additional client
time.Sleep(time.Millisecond * 1)
Expand Down Expand Up @@ -197,12 +198,13 @@ func init() {
elastic.RegisterElasticRunFlags(pflags)
}

func queryWorkerGeoShape(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, indexSearchName, fieldName, queryType string, testDuration int, queryTimeoutMillis int64, debugLevel int) {
func queryWorkerGeoShape(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, indexSearchName, fieldName, queryType string, testDuration int, queryTimeoutMillis int64, debugLevel int, connWriteTimeout time.Duration) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -291,13 +293,14 @@ func verbosePrintRediSearchReply(querySearch string, polygon string, resultSetSi
}
}

func queryWorkerGeoPoint(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, mu sync.Mutex, r *rand.Rand, redisGeoKeyname string, indexSearchName string, fieldName string, testDuration int) {
func queryWorkerGeoPoint(uri, password string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, mu sync.Mutex, r *rand.Rand, redisGeoKeyname string, indexSearchName string, fieldName string, testDuration int, connWriteTimeout time.Duration) {

c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
InitAddress: []string{uri},
DisableCache: true,
AlwaysRESP2: true,
Password: password,
ConnWriteTimeout: connWriteTimeout,
})
if err != nil {
panic(err)
Expand Down
8 changes: 4 additions & 4 deletions cmd/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const QUERY_TYPE_GEODIST_BBOX = "geodist-bbox"
const QUERY_TYPE_GEOSHAPE_WITHIN = "geoshape-within"
const QUERY_TYPE_GEOSHAPE_CONTAINS = "geoshape-contains"
const DEFAULT_QUERY_TYPE = QUERY_TYPE_GEOSHAPE_CONTAINS
const DEFAULT_QUERY_TIMEOUT = 10000
const DEFAULT_COMMAND_TIMEOUT = 300000
const REDIS_IDX_NAME_PROPERTY = "redisearch.index.name"
const REDIS_URI_PROPERTY = "redis.uri"
const REDIS_PASSWORD_PROPERTY = "redis.password"
Expand All @@ -43,7 +43,7 @@ func RegisterRedisLoadFlags(flags *pflag.FlagSet) {
flags.BoolP(REDIS_IDX_PROPERTY, "", true, "Enable redisearch secondary index on HASH and JSON datatypes")
flags.StringP(REDIS_IDX_NAME_PROPERTY, "", REDIS_DEFAULT_IDX_NAME, "redisearch secondary index name")
flags.StringP(REDIS_GEO_KEYNAME_PROPERTY, "", REDIS_GEO_DEFAULT_KEYNAME, "redis GEO keyname")
flags.Int64P(REDIS_COMMAND_TIMEOUT, "", DEFAULT_QUERY_TIMEOUT, "Command timeout in millis.")
flags.Int64P(REDIS_COMMAND_TIMEOUT, "", DEFAULT_COMMAND_TIMEOUT, "Command timeout in millis.")
flags.IntP("debug", "", 0, "debug level. O no debug.")

}
Expand All @@ -52,15 +52,15 @@ func PrepareRedisQueryCommandFlags(pflags *pflag.FlagSet) {
pflags.StringP("input", "i", "documents.json", "Input json file")
pflags.StringP("input-type", "", DEFAULT_INPUT_TYPE, "Input type. One of 'geopoint' or 'geoshape'")
pflags.StringP("query-type", "", DEFAULT_QUERY_TYPE, "Query type. Only used for 'geoshape' inputs. One of 'geoshape-contains' or 'geoshape-within'")
pflags.Int64P(REDIS_COMMAND_TIMEOUT, "", DEFAULT_QUERY_TIMEOUT, "Query timeout in millis.")
pflags.Int64P(REDIS_COMMAND_TIMEOUT, "", DEFAULT_COMMAND_TIMEOUT, "Command timeout in millis.")
pflags.IntP("concurrency", "c", 50, "Concurrency")
pflags.IntP("debug", "", 0, "debug level. O no debug.")
pflags.IntP("random.seed", "", 12345, "Random seed")
pflags.IntP("test.time", "", -1, "Number of seconds to run the test. . If -1 then it will use requests property")
pflags.IntP("requests", "n", -1, "Requests. If -1 then it will use all input datapoints")
pflags.StringP(REDIS_URI_PROPERTY, "u", REDIS_URI_PROPERTY_DEFAULT, "Server URI")
pflags.StringP(REDIS_PASSWORD_PROPERTY, "", REDIS_PASSWORD_PROPERTY_DEFAULT, "Server Password")
pflags.BoolP("cluster", "", false, "Enable cluster mode")
pflags.BoolP(REDIS_CLUSTER_PROPERTY, "", false, "Enable cluster mode")
pflags.StringP(REDIS_IDX_NAME_PROPERTY, "", REDIS_DEFAULT_IDX_NAME, "redisearch secondary index name")
pflags.StringP(REDIS_GEO_KEYNAME_PROPERTY, "", REDIS_GEO_DEFAULT_KEYNAME, "redis GEO keyname")
}
Expand Down