Skip to content

Commit

Permalink
spring-projectsGH-2432: Fix Retryable Topic Provisioning
Browse files Browse the repository at this point in the history
Resolves spring-projects#2432

Don't provision an individual retry topic bean, if there is already a `NewTopic`
bean with the same topic name.

**cherry-pick to 2.9.x, 2.8.x**
  • Loading branch information
garyrussell committed Oct 12, 2022
1 parent 6536f3c commit 20c80fe
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.Nullable;

/**
Expand Down Expand Up @@ -181,8 +184,7 @@ public void afterSingletonsInstantiated() {
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
Collection<NewTopic> newTopics = new ArrayList<>(
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
Collection<NewTopic> newTopics = newTopics();
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
if (newTopics.size() > 0) {
Expand Down Expand Up @@ -225,6 +227,30 @@ public final boolean initialize() {
return false;
}

/*
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
*/
private Collection<NewTopic> newTopics() {
Map<String, NewTopic> newTopicsMap = new HashMap<>(
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream()
.filter(entry -> entry.getValue() instanceof TopicForRetryable)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) {
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, NewTopic> nt = iterator.next();
if (nt.getValue().name().equals(entry.getValue().name())
&& !(entry.getValue() instanceof TopicForRetryable)) {

iterator.remove();
}
}
}
Collection<NewTopic> newTopics = new ArrayList<>(newTopicsMap.values());
return newTopics;
}

@Override
@Nullable
public String clusterId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.function.Consumer;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.BeansException;
Expand All @@ -37,6 +36,7 @@
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.Nullable;


Expand Down Expand Up @@ -363,7 +363,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
String beanName = topic + "-topicRegistrationBean";
if (!bf.containsBean(beanName)) {
bf.registerSingleton(beanName,
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
new TopicForRetryable(topic, config.getNumPartitions(), config.getReplicationFactor()));
}
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support;

import org.apache.kafka.clients.admin.NewTopic;

/**
* Marker to indicate this {@link NewTopic} is for retryable topics; admin will ignore these if
* a regular {@link NewTopic} exist.
*
* @author Gary Russell
* @since 2.8.10
*
*/
public class TopicForRetryable extends NewTopic {

/**
* Create an instance with the provided properties.
* @param topic the topic.
* @param numPartitions the partitions.
* @param replicationFactor the replication factor.
*/
public TopicForRetryable(String topic, int numPartitions, short replicationFactor) {
super(topic, numPartitions, replicationFactor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -47,6 +49,7 @@
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand Down Expand Up @@ -133,11 +136,14 @@ void shouldRetrySecondTopic() {
}

@Test
void shouldRetryThirdTopicWithTimeout() {
void shouldRetryThirdTopicWithTimeout(@Autowired KafkaAdmin admin) {
logger.debug("Sending message to topic " + THIRD_TOPIC);
kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
Map<String, TopicDescription> topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt");
assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2);
assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3);
}

@Test
Expand Down Expand Up @@ -527,6 +533,11 @@ public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build();
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
Expand Down

0 comments on commit 20c80fe

Please sign in to comment.