Skip to content

Commit

Permalink
chore: Reuse the MQ parameters for operation queue
Browse files Browse the repository at this point in the history
There is no need to have a separate set of parameters for the operation queue that dictate retry behavior since the parameters are already defined for the pub-sub subsystem.

Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Jun 30, 2022
1 parent 224d8bc commit 29a0c48
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 93 deletions.
47 changes: 5 additions & 42 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ const (
opQueueDefaultTaskMonitorInterval = 10 * time.Second
opQueueDefaultTaskExpiration = 30 * time.Second
opQueueDefaultMaxReposts = 20
opQueueDefaultRepostInitialDelay = 5 * time.Second
opQueueDefaultRepostMaxDelay = 1 * time.Minute
opQueueDefaultRepostMultiplier = 1.5
opQueueOperationExpirationGracePeriod = 10 * time.Minute
splitRequestTokenLength = 2
vctReadTokenKey = "vct-read"
Expand Down Expand Up @@ -363,22 +360,6 @@ const (
opQueueMaxRepostsFlagUsage = "The maximum number of times an operation may be reposted to the queue " +
"after having failed (default is 20). " + commonEnvVarUsageText + opQueueMaxRepostsEnvKey

opQueueRepostInitialDelayFlagName = "op-queue-repost-initial-delay"
opQueueRepostInitialDelayEnvKey = "OP_QUEUE_REPOST_INITIAL_DELAY"
opQueueRepostInitialDelayFlagUsage = "The delay for the initial retry attempt after having failed to process " +
"the operation in the queue (default is 5s). " + commonEnvVarUsageText + opQueueRepostInitialDelayEnvKey

opQueueRepostMaxDelayFlagName = "op-queue-repost-max-delay"
opQueueRepostMaxDelayEnvKey = "OP_QUEUE_REPOST_MAX_DELAY"
opQueueRepostMaxDelayFlagUsage = "The maximum delay of a retry attempt after having failed to process " +
"the operation in the queue (default is 2m). " + commonEnvVarUsageText + opQueueMaxRepostsEnvKey

opQueueRepostMultiplierFlagName = "op-queue-repost-multiplier"
opQueueRepostMultiplierEnvKey = "OP_QUEUE_REPOST_MULTIPLIER"
opQueueRepostMultiplierFlagUsage = "The multiplier for a retry attempt after having failed to process " +
"the operation in the queue. For example, if set to 1.5 and the previous retry interval was 2s then " +
"the next retry interval is set 3s (default is 1.5). " + commonEnvVarUsageText + opQueueMaxRepostsEnvKey

cidVersionFlagName = "cid-version"
cidVersionEnvKey = "CID_VERSION"
cidVersionFlagUsage = "The version of the CID format to use for generating CIDs. " +
Expand Down Expand Up @@ -975,7 +956,7 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
batchWriterTimeout = time.Duration(timeout) * time.Millisecond
}

opQueueParams, err := getOpQueueParameters(cmd, batchWriterTimeout)
opQueueParams, err := getOpQueueParameters(cmd, batchWriterTimeout, mqParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1856,7 +1837,7 @@ func getMQParameters(cmd *cobra.Command) (*mqParams, error) {
}, nil
}

func getOpQueueParameters(cmd *cobra.Command, batchTimeout time.Duration) (*opqueue.Config, error) {
func getOpQueueParameters(cmd *cobra.Command, batchTimeout time.Duration, mqParams *mqParams) (*opqueue.Config, error) {
poolSize, err := getInt(cmd, opQueuePoolFlagName, opQueuePoolEnvKey, opQueueDefaultPoolSize)
if err != nil {
return nil, fmt.Errorf("%s: %w", opQueuePoolFlagName, err)
Expand All @@ -1879,21 +1860,6 @@ func getOpQueueParameters(cmd *cobra.Command, batchTimeout time.Duration) (*opqu
return nil, fmt.Errorf("%s: %w", opQueueMaxRepostsFlagName, err)
}

retriesInitialDelay, err := getDuration(cmd, opQueueRepostInitialDelayFlagName, opQueueRepostInitialDelayEnvKey, opQueueDefaultRepostInitialDelay)
if err != nil {
return nil, fmt.Errorf("%s: %w", opQueueRepostInitialDelayFlagName, err)
}

retriesMaxDelay, err := getDuration(cmd, opQueueRepostMaxDelayFlagName, opQueueRepostMaxDelayEnvKey, opQueueDefaultRepostMaxDelay)
if err != nil {
return nil, fmt.Errorf("%s: %w", opQueueRepostMaxDelayFlagName, err)
}

retriesMultiplier, err := getFloat(cmd, opQueueRepostMultiplierFlagName, opQueueRepostMultiplierEnvKey, opQueueDefaultRepostMultiplier)
if err != nil {
return nil, fmt.Errorf("%s: %w", opQueueRepostMultiplierFlagName, err)
}

// The operation expiration is set to the batch timeout plus a grace period. The operation should
// exist in the database until a batch times out, after which it is assumed that those operations are no
// longer valid and may be deleted.
Expand All @@ -1905,9 +1871,9 @@ func getOpQueueParameters(cmd *cobra.Command, batchTimeout time.Duration) (*opqu
TaskExpiration: taskExpiration,
OpExpiration: operationExpiration,
MaxRetries: maxRetries,
RetriesInitialDelay: retriesInitialDelay,
RetriesMaxDelay: retriesMaxDelay,
RetriesMultiplier: retriesMultiplier,
RetriesInitialDelay: mqParams.redeliveryInitialInterval,
RetriesMaxDelay: mqParams.maxRedeliveryInterval,
RetriesMultiplier: mqParams.redeliveryMultiplier,
}, nil
}

Expand Down Expand Up @@ -2099,9 +2065,6 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(opQueueTaskMonitorIntervalFlagName, "", "", opQueueTaskMonitorIntervalFlagUsage)
startCmd.Flags().StringP(opQueueTaskExpirationFlagName, "", "", opQueueTaskExpirationFlagUsage)
startCmd.Flags().StringP(opQueueMaxRepostsFlagName, "", "", opQueueMaxRepostsFlagUsage)
startCmd.Flags().StringP(opQueueRepostInitialDelayFlagName, "", "", opQueueRepostInitialDelayFlagUsage)
startCmd.Flags().StringP(opQueueRepostMaxDelayFlagName, "", "", opQueueRepostMaxDelayFlagUsage)
startCmd.Flags().StringP(opQueueRepostMultiplierFlagName, "", "", opQueueRepostMultiplierFlagUsage)
startCmd.Flags().String(cidVersionFlagName, "1", cidVersionFlagUsage)
startCmd.Flags().StringP(didNamespaceFlagName, didNamespaceFlagShorthand, "", didNamespaceFlagUsage)
startCmd.Flags().StringArrayP(didAliasesFlagName, didAliasesFlagShorthand, []string{}, didAliasesFlagUsage)
Expand Down
62 changes: 13 additions & 49 deletions cmd/orb-server/startcmd/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,23 +1335,23 @@ func TestGetOpQueueParameters(t *testing.T) {
restoreTaskMonitorIntervalEnv := setEnv(t, opQueueTaskMonitorIntervalEnvKey, "17s")
restoreTaskExpirationEnv := setEnv(t, opQueueTaskExpirationEnvKey, "33s")
restoreMaxRepostsEnv := setEnv(t, opQueueMaxRepostsEnvKey, "23")
restoreRepostInitialDelayEnv := setEnv(t, opQueueRepostInitialDelayEnvKey, "4s")
restoreRepostMaxDelay := setEnv(t, opQueueRepostMaxDelayEnvKey, "3m")
restoreRepostMultiplierEnv := setEnv(t, opQueueRepostMultiplierEnvKey, "2.5")

defer func() {
restorePoolEnv()
restoreTaskExpirationEnv()
restoreTaskMonitorIntervalEnv()
restoreMaxRepostsEnv()
restoreRepostInitialDelayEnv()
restoreRepostMaxDelay()
restoreRepostMultiplierEnv()
}()

cmd := getTestCmd(t)

opQueueParams, err := getOpQueueParameters(cmd, time.Minute)
opQueueParams, err := getOpQueueParameters(cmd, time.Minute,
&mqParams{
redeliveryMultiplier: 2.5,
redeliveryInitialInterval: 4 * time.Second,
maxRedeliveryInterval: 3 * time.Minute,
},
)
require.NoError(t, err)
require.Equal(t, 221, opQueueParams.PoolSize)
require.Equal(t, 17*time.Second, opQueueParams.TaskMonitorInterval)
Expand All @@ -1366,15 +1366,12 @@ func TestGetOpQueueParameters(t *testing.T) {
t.Run("Not specified -> default value", func(t *testing.T) {
cmd := getTestCmd(t)

opQueueParams, err := getOpQueueParameters(cmd, time.Minute)
opQueueParams, err := getOpQueueParameters(cmd, time.Minute, &mqParams{})
require.NoError(t, err)
require.Equal(t, opQueueDefaultPoolSize, opQueueParams.PoolSize)
require.Equal(t, opQueueDefaultTaskMonitorInterval, opQueueParams.TaskMonitorInterval)
require.Equal(t, opQueueDefaultTaskExpiration, opQueueParams.TaskExpiration)
require.Equal(t, opQueueDefaultMaxReposts, opQueueParams.MaxRetries)
require.Equal(t, opQueueDefaultRepostInitialDelay, opQueueParams.RetriesInitialDelay)
require.Equal(t, opQueueDefaultRepostMaxDelay, opQueueParams.RetriesMaxDelay)
require.Equal(t, opQueueDefaultRepostMultiplier, opQueueParams.RetriesMultiplier)
require.Equal(t, defaultBatchWriterTimeout+opQueueOperationExpirationGracePeriod, opQueueParams.OpExpiration)
})

Expand All @@ -1387,12 +1384,12 @@ func TestGetOpQueueParameters(t *testing.T) {

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
_, err := getOpQueueParameters(cmd, time.Minute, &mqParams{})
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})

t.Run("Invalid pool size value -> error", func(t *testing.T) {
t.Run("Invalid task monitor interval value -> error", func(t *testing.T) {
restoreTaskMonitorIntervalEnv := setEnv(t, opQueueTaskMonitorIntervalEnvKey, "17")

defer func() {
Expand All @@ -1401,7 +1398,7 @@ func TestGetOpQueueParameters(t *testing.T) {

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
_, err := getOpQueueParameters(cmd, time.Minute, &mqParams{})
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})
Expand All @@ -1415,7 +1412,7 @@ func TestGetOpQueueParameters(t *testing.T) {

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
_, err := getOpQueueParameters(cmd, time.Minute, &mqParams{})
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})
Expand All @@ -1429,40 +1426,7 @@ func TestGetOpQueueParameters(t *testing.T) {

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})

t.Run("Invalid repost initial delay value -> error", func(t *testing.T) {
restore := setEnv(t, opQueueRepostInitialDelayEnvKey, "xxx")
defer restore()

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})

t.Run("Invalid repost max delay value -> error", func(t *testing.T) {
restore := setEnv(t, opQueueRepostMaxDelayEnvKey, "xxx")
defer restore()

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})

t.Run("Invalid repost multiplier value -> error", func(t *testing.T) {
restore := setEnv(t, opQueueRepostMultiplierEnvKey, "xxx")
defer restore()

cmd := getTestCmd(t)

_, err := getOpQueueParameters(cmd, time.Minute)
_, err := getOpQueueParameters(cmd, time.Minute, &mqParams{})
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value")
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/context/opqueue/opqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ const (
defaultTaskExpirationFactor = 2
defaultOpCleanupFactor = 5
defaultMaxRetries = 10
defaultRetryInitialDelay = 5 * time.Second
defaultMaxRetryDelay = 1 * time.Minute
defaultRetryInitialDelay = 2 * time.Second
defaultMaxRetryDelay = 30 * time.Second
defaultRetryMultiplier = 1.5
)

Expand Down

0 comments on commit 29a0c48

Please sign in to comment.