Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry-topics auto creation also creates the main topic #2432

Closed
jgslima opened this issue Oct 12, 2022 · 4 comments · Fixed by #2437
Closed

Retry-topics auto creation also creates the main topic #2432

jgslima opened this issue Oct 12, 2022 · 4 comments · Fixed by #2437

Comments

@jgslima
Copy link
Contributor

jgslima commented Oct 12, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.9.1
Actually I already saw this behaviour since 2.8, not sure the oldest version that has this behaviour.

Describe the bug
When the autoCreateTopics feature of the Non-blocking retries main feature is activated, the framework also creates the main topic with the configurations (partitions and replicas) of the retry topics, even if the context has as NewTopic bean for the main topic with different configurations.

To Reproduce

Run the application below:

@SpringBootApplication
@EnableScheduling
@EnableKafkaRetryTopic
@Slf4j
public class SpringKafkaStudies {

	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaStudies.class, args);
	}

	@Bean
	public NewTopic mainTopic() {
		return TopicBuilder
				.name("mytopic")
				.partitions(10)
				.replicas(2)
				.build();
	}

	@RetryableTopic(attempts = "3",
			backoff = @Backoff(delayExpression = "5000"),
			fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
			autoCreateTopics = "true",
			numPartitions = "5",
			replicationFactor = "1")
	@KafkaListener(topics = "mytopic", concurrency = "3")
	public void nonBlockingRetriesListen(ConsumerRecord<String, String> record) {
		log.info("Received record: {}", record.value());
	}
	
}

Expected behavior
The main topic "mytopic" to be created with 10 partitions and 2 replicas, and the retry and DLT topic to be created with 5 partitions and 1 replicas.
But actually, the main topic "mytopic" is also created with 5 partitions and 1 replica.

Sample
See the code above.

@jgslima
Copy link
Contributor Author

jgslima commented Oct 12, 2022

Indeed, as the default behaviour of spring-kafka, when sending the record to the retry or DLT topics, is to assume that the partition in those topics should be the same as the original topic, this means that the retry topics should have the same amount of partitions as the main one (although I think that the default behaviour of DeadLetterPublishingRecovererFactory.resolveTopicPartition() should be to not specify/resolve the partition, leaving this to the producer).

However the claim to have a distinct configuration of replication factor should be enough to expect that the framework might have at least some configuration to indicate that, the topics auto-creation feature of the retry mechanism should be done solely for the retry and DLT topics.

@garyrussell
Copy link
Contributor

This is a bit of a hack, but it will serve as a work around until we can get this implemented:

@SpringBootApplication
public class Kgh2432Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh2432Application.class, args);
	}

	@KafkaListener(id = "kgh2432", topics = "kgh2432")
	@RetryableTopic
	void listen(String in) {
		System.out.println(in);
	}

	@Bean
	NewTopic topic() {
		return TopicBuilder.name("kgh2432").partitions(10).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaAdmin admin) {
		return args -> {
			Map<String, TopicDescription> topics = admin.describeTopics("kgh2432", "kgh2432-dlt");
			Assert.state(topics.get("kgh2432").partitions().size() == 10, "Wrong main partitions");
			Assert.state(topics.get("kgh2432-dlt").partitions().size() == 1, "Wrong DLT partitions");
		};
	}

}

@Component
class NewTopicWeeder implements SmartInitializingSingleton, Ordered {

	private final DefaultListableBeanFactory beanFactory;

	NewTopicWeeder (DefaultListableBeanFactory beanFactory) {
		this.beanFactory = beanFactory;
	}

	@Override
	public int getOrder() {
		return 0;
	}

	@Override
	public void afterSingletonsInstantiated() {
		this.beanFactory.destroySingleton("kgh2432-topicRegistrationBean");
	}

}

@garyrussell garyrussell modified the milestones: 3.0.0-RC1, 3.0.0-RC2 Oct 12, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 12, 2022
Resolves spring-projects#2432

Don't provision an individual retry topic bean, if there is already a `NewTopic`
bean with the same topic name.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Oct 13, 2022
Resolves #2432

Don't provision an individual retry topic bean, if there is already a `NewTopic`
bean with the same topic name.

**cherry-pick to 2.9.x, 2.8.x**

* Fix `TopicForRetry` removal logic; include `NewTopics` beans in logic.

* Improve test.
artembilan pushed a commit that referenced this issue Oct 13, 2022
Resolves #2432

Don't provision an individual retry topic bean, if there is already a `NewTopic`
bean with the same topic name.

**cherry-pick to 2.9.x, 2.8.x**

* Fix `TopicForRetry` removal logic; include `NewTopics` beans in logic.

* Improve test.
# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java
#	spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java
artembilan pushed a commit that referenced this issue Oct 13, 2022
Resolves #2432

Don't provision an individual retry topic bean, if there is already a `NewTopic`
bean with the same topic name.

**cherry-pick to 2.9.x, 2.8.x**

* Fix `TopicForRetry` removal logic; include `NewTopics` beans in logic.

* Improve test.
# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java
#	spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

# Conflicts:
#	spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java
@garyrussell garyrussell modified the milestones: 3.0.0-RC2, 3.0.0-RC1 Oct 13, 2022
@garyrussell
Copy link
Contributor

@jgslima #2438

@jgslima
Copy link
Contributor Author

jgslima commented Oct 14, 2022

@jgslima #2438

Wow. Thank you very much for the attention and dedication.

I think 10 times before opening a spring issue, glad that it made sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants