Skip to content

Commit b8d43f0

Browse files
committed
Add property to control shutdown timeout for processors
As a replacement for `KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION`. Signed-off-by: nscuro <[email protected]>
1 parent 875ff30 commit b8d43f0

File tree

3 files changed

+14
-0
lines changed

3 files changed

+14
-0
lines changed

src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_MULTIPLIER_DEFAULT;
7979
import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_RANDOMIZATION_FACTOR;
8080
import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_RETRY_RANDOMIZATION_FACTOR_DEFAULT;
81+
import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_SHUTDOWN_TIMEOUT_MS;
82+
import static org.dependencytrack.event.kafka.processor.api.ProcessorProperties.PROPERTY_SHUTDOWN_TIMEOUT_MS_DEFAULT;
8183

8284
public class ProcessorManager implements AutoCloseable {
8385

@@ -289,6 +291,11 @@ private ParallelStreamProcessor<byte[], byte[]> createParallelConsumer(final Str
289291
return Duration.ofMillis(delayMillis);
290292
});
291293

294+
final long shutdownTimeoutMs = Optional.ofNullable(properties.get(PROPERTY_SHUTDOWN_TIMEOUT_MS))
295+
.map(Long::parseLong)
296+
.orElse(PROPERTY_SHUTDOWN_TIMEOUT_MS_DEFAULT);
297+
optionsBuilder.shutdownTimeout(Duration.ofMillis(shutdownTimeoutMs));
298+
292299
if (Config.getInstance().getPropertyAsBoolean(Config.AlpineKey.METRICS_ENABLED)) {
293300
optionsBuilder
294301
.meterRegistry(Metrics.getRegistry())

src/main/java/org/dependencytrack/event/kafka/processor/api/ProcessorProperties.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ final class ProcessorProperties {
3636
static final double PROPERTY_RETRY_RANDOMIZATION_FACTOR_DEFAULT = 0.3;
3737
static final String PROPERTY_RETRY_MAX_DELAY_MS = "retry.max.delay.ms";
3838
static final long PROPERTY_RETRY_MAX_DELAY_MS_DEFAULT = 60 * 1000; // 60s
39+
static final String PROPERTY_SHUTDOWN_TIMEOUT_MS = "shutdown.timeout.ms";
40+
static final long PROPERTY_SHUTDOWN_TIMEOUT_MS_DEFAULT = 10 * 1000; // 10s
3941

4042
private ProcessorProperties() {
4143
}

src/main/resources/application.properties

+5
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,11 @@ application.id=dtrack-apiserver
453453
# alpine.kafka.processor.<name>.retry.randomization.factor=0.3
454454
# alpine.kafka.processor.<name>.retry.max.delay.ms=60000
455455

456+
# Optional
457+
# Defines the timeout to wait for the processor to finish any pending work
458+
# prior to being shut down.
459+
# alpine.kafka.processor.<name>.shutdown.timeout.ms=10000
460+
456461
# Optional
457462
# Allows for customization of the underlying Kafka consumer.
458463
# Refer to https://kafka.apache.org/documentation/#consumerconfigs for available options.

0 commit comments

Comments
 (0)