diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index cf216e8501d6c..41588ecd67f36 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -18,6 +18,7 @@ type AMQPConsumer struct { AMQPHost string AMQPPort string Queue string + Prefetch int sync.Mutex @@ -43,6 +44,7 @@ func (rmq *AMQPConsumer) SampleConfig() string { amqp_port = "5672" # name of the queue to consume from queue = "task_queue" + prefetch = 1000 data_format = "influx" ` @@ -84,7 +86,7 @@ func (rmq *AMQPConsumer) Start(acc telegraf.Accumulator) error { rmq.q = q // Declare QoS on queue - err = ch.Qos(1, 0, false) + err = ch.Qos(rmq.Prefetch, 0, false) if err != nil { return fmt.Errorf("%v: failed to set Qos", err) }