I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka
.
The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment
method but it doesn't work as I expect.
Let's say there is Kafka listener component:
@Component
public class SampleKafkaConsumer {
@KafkaListener(
topics = "${kafka.listener.some-event.topic}",
groupId = "${kafka.listener.some-event.group-id}",
)
void consume(final SomeEvent event) {
// do sth
}
}
And there is a base class for integration tests:
@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {
@Autowired(required = true)
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
void setup() {
kafkaListenerEndpointRegistry.getAllListenerContainers()
.stream()
.forEach { ContainerTestUtils.waitForAssignment(it, 1) }
}
}
And for listener there is assigned fixed group-id:
kafka.listener.some-event.group-id=test.some-event
Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec
, the first test class works, but the second try of executing waitForAssignment()
ends with an error:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
I was trying to guard waitForAssignment
call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext
so Spring context is cached, but I feel like something wrong happens with the container when running new test class.
Another thing is that when I set random group ID name:
kafka.listener.some-event.group-id=test.some-event-${random.uuid}
Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer
is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.
If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?
I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka
.
The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment
method but it doesn't work as I expect.
Let's say there is Kafka listener component:
@Component
public class SampleKafkaConsumer {
@KafkaListener(
topics = "${kafka.listener.some-event.topic}",
groupId = "${kafka.listener.some-event.group-id}",
)
void consume(final SomeEvent event) {
// do sth
}
}
And there is a base class for integration tests:
@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {
@Autowired(required = true)
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
void setup() {
kafkaListenerEndpointRegistry.getAllListenerContainers()
.stream()
.forEach { ContainerTestUtils.waitForAssignment(it, 1) }
}
}
And for listener there is assigned fixed group-id:
kafka.listener.some-event.group-id=test.some-event
Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec
, the first test class works, but the second try of executing waitForAssignment()
ends with an error:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
I was trying to guard waitForAssignment
call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext
so Spring context is cached, but I feel like something wrong happens with the container when running new test class.
Another thing is that when I set random group ID name:
kafka.listener.some-event.group-id=test.some-event-${random.uuid}
Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer
is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.
If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?
Share Improve this question edited Nov 19, 2024 at 13:47 BartekN asked Nov 18, 2024 at 20:22 BartekNBartekN 2313 silver badges12 bronze badges1 Answer
Reset to default 0I think the @DirtiesContext
is your friend. The point is that when Spring application context is cached, all of its bean are active. Therefore when you run a new test there is high possibility that your partitions are stolen by the container in the other cached context. Just because all of them are connected to the same Kafka broker.