Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI flag for SQS RPC timeout #3157

Merged
merged 1 commit into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/multitenant/sqs_control_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

var (
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
sqsRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "scope",
Name: "sqs_request_duration_seconds",
Expand All @@ -45,6 +44,7 @@ type sqsControlRouter struct {
responseQueueURL *string
userIDer UserIDer
prefix string
rpcTimeout time.Duration

mtx sync.Mutex
responses map[string]chan xfer.Response
Expand All @@ -63,12 +63,13 @@ type sqsResponseMessage struct {
}

// NewSQSControlRouter the harbinger of death
func NewSQSControlRouter(config *aws.Config, userIDer UserIDer, prefix string) app.ControlRouter {
func NewSQSControlRouter(config *aws.Config, userIDer UserIDer, prefix string, rpcTimeout time.Duration) app.ControlRouter {
result := &sqsControlRouter{
service: sqs.New(session.New(config)),
responseQueueURL: nil,
userIDer: userIDer,
prefix: prefix,
rpcTimeout: rpcTimeout,
responses: map[string]chan xfer.Response{},
probeWorkers: map[int64]*probeWorker{},
}
Expand Down Expand Up @@ -257,7 +258,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
select {
case response := <-waiter:
return response, nil
case <-time.After(rpcTimeout):
case <-time.After(cr.rpcTimeout):
return xfer.Response{}, fmt.Errorf("request timed out")
}
}
Expand Down
6 changes: 3 additions & 3 deletions prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func emitterFactory(collector app.Collector, clientCfg billing.Config, userIDer
)
}

func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string) (app.ControlRouter, error) {
func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string, controlRPCTimeout time.Duration) (app.ControlRouter, error) {
if controlRouterURL == "local" {
return app.NewLocalControlRouter(), nil
}
Expand All @@ -163,7 +163,7 @@ func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string
if err != nil {
return nil, err
}
return multitenant.NewSQSControlRouter(sqsConfig, userIDer, prefix), nil
return multitenant.NewSQSControlRouter(sqsConfig, userIDer, prefix, controlRPCTimeout), nil
}

return nil, fmt.Errorf("Invalid control router '%s'", controlRouterURL)
Expand Down Expand Up @@ -239,7 +239,7 @@ func appMain(flags appFlags) {
collector = billingEmitter
}

controlRouter, err := controlRouterFactory(userIDer, flags.controlRouterURL)
controlRouter, err := controlRouterFactory(userIDer, flags.controlRouterURL, flags.controlRPCTimeout)
if err != nil {
log.Fatalf("Error creating control router: %v", err)
return
Expand Down
2 changes: 2 additions & 0 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type appFlags struct {
collectorURL string
s3URL string
controlRouterURL string
controlRPCTimeout time.Duration
pipeRouterURL string
natsHostname string
memcachedHostname string
Expand Down Expand Up @@ -348,6 +349,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)")
flag.DurationVar(&flags.app.controlRPCTimeout, "app.control.rpctimeout", time.Minute, "Timeout for control RPC")
flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")
flag.StringVar(&flags.app.natsHostname, "app.nats", "", "Hostname for NATS service to use for shortcut reports. If empty, shortcut reporting will be disabled.")
flag.StringVar(&flags.app.memcachedHostname, "app.memcached.hostname", "", "Hostname for memcached service to use when caching reports. If empty, no memcached will be used.")
Expand Down