Skip to content

Commit

Permalink
Merge pull request #26 from pfnet-research/get-task-in-chunks
Browse files Browse the repository at this point in the history
get tasks in chunks in get-task command
  • Loading branch information
everpeace authored Sep 2, 2020
2 parents bfb8681 + 6e9ae18 commit c1dfb99
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 42 deletions.
10 changes: 7 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func init() {
)
viperBindPFlag("Redis.IdleCheckFrequency", cmdOpts.Redis.IdleCheckFrequency.String(), flag.Lookup("redis-idle-check-frequency"))

flag.Int("redis-chunk-size-in-get", cmdOpts.Redis.ChunkSizeInGet, "chunk size when gettings tasks from redis")
viperBindPFlag("Redis.ChunkSizeInGet", strconv.Itoa(cmdOpts.Redis.ChunkSizeInGet), flag.Lookup("redis-chunk-size-in-get"))

// BackoffConfig for redis
flag.Duration("redis-backoff-initial-interval", cmdOpts.Redis.Backoff.InitialInterval, "initial interval of exponential backoff used in redis operation")
viperBindPFlag("Redis.Backoff.InitialInterval", cmdOpts.Redis.Backoff.InitialInterval.String(), flag.Lookup("redis-backoff-initial-interval"))
Expand Down Expand Up @@ -294,9 +297,10 @@ func mustInitializeQueueBackend() {
queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: cmdOpts.Backend,
Redis: &backendconfig.RedisConfig{
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
},
})

Expand Down
16 changes: 9 additions & 7 deletions pkg/backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type Config struct {
}

type RedisConfig struct {
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
ChunkSizeInGet int
}

// TODO: support UniversalOptions
Expand All @@ -41,16 +42,17 @@ type RedisClientConfig struct {
Password string `json:"password" yaml:"password" default:""`
DB int `json:"db" yaml:"db" default:"0"`

DialTimeout time.Duration `json:"dialTimeout" yaml:"dialTimeout" default:"30s"`
ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout" default:"10m"`
WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout" default:"10m"`

DialTimeout time.Duration `json:"dialTimeout" yaml:"dialTimeout" default:"30s"`
ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout" default:"10m"`
WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout" default:"10m"`
PoolSize int `json:"poolSize" yaml:"poolSize" default:"-"`
MinIdleConns int `json:"minIdleConns" yaml:"minIdleConns" default:"-"`
MaxConnAge time.Duration `json:"maxConnAge" yaml:"maxConnAge" default:"-"`
PoolTimeout time.Duration `json:"poolTimeout" yaml:"poolTimeout" default:"-"`
IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"`
IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"`

ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
}

func (c RedisClientConfig) NewClient() *redis.Client {
Expand Down
7 changes: 4 additions & 3 deletions pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ var _ = Describe("Backend", func() {
ibackend, err := NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down
68 changes: 41 additions & 27 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,39 +111,53 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func
if len(taskUIDs) == 0 {
return []*task.Task{}, nil
}
taskKeys := []string{}
for _, uid := range taskUIDs {
taskKeys = append(taskKeys, b.taskKey(queueUID, uid))
}
rawTasks, err := b.Client.MGet(taskKeys...).Result()
if err != nil {
return nil, err

chunks := [][]string{}
chunkSize := b.RedisConfig.ChunkSizeInGet
for i := 0; i < len(taskUIDs); i += chunkSize {
until := len(taskUIDs)
if i+chunkSize <= len(taskUIDs) {
until = i + chunkSize
}
chunks = append(chunks, taskUIDs[i:until])
}

tasks := []*task.Task{}
for _, rawTask := range rawTasks {
if rawTask == nil {
lggr.Error().
Interface("rawData", nil).
Msg("Internal error. rawData is null. Skipped.")
continue
for _, chunk := range chunks {
taskKeys := []string{}
for _, uid := range chunk {
taskKeys = append(taskKeys, b.taskKey(queueUID, uid))
}
var t task.Task
rawTaskStr, ok := rawTask.(string)
if !ok {
lggr.Error().
Interface("rawData", rawTask).
Str("type", reflect.TypeOf(rawTask).String()).
Msg("Internal error. rawData should be string. Skipped.")
continue
}
if err := json.Unmarshal([]byte(rawTaskStr), &t); err != nil {
lggr.Error().Str("data", rawTaskStr).Msg("Failed to unmarshal to Task. Skipped.")
continue
rawTasks, err := b.Client.MGet(taskKeys...).Result()
if err != nil {
return nil, err
}
if filter(&t) {
tasks = append(tasks, &t)
for _, rawTask := range rawTasks {
if rawTask == nil {
lggr.Error().
Interface("rawData", nil).
Msg("Internal error. rawData is null. Skipped.")
continue
}
var t task.Task
rawTaskStr, ok := rawTask.(string)
if !ok {
lggr.Error().
Interface("rawData", rawTask).
Str("type", reflect.TypeOf(rawTask).String()).
Msg("Internal error. rawData should be string. Skipped.")
continue
}
if err := json.Unmarshal([]byte(rawTaskStr), &t); err != nil {
lggr.Error().Str("data", rawTaskStr).Msg("Failed to unmarshal to Task. Skipped.")
continue
}
if filter(&t) {
tasks = append(tasks, &t)
}
}
}

return tasks, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ var _ = Describe("Worker", func() {
bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
Client: client,
Backoff: backendConfig,
Client: client,
Backoff: backendConfig,
ChunkSizeInGet: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit c1dfb99

Please sign in to comment.