最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

spring kafka - How to use ContainerTestUtils.waitForAssignment when this method is called multiple times for the same container?

programmeradmin10浏览0评论

I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka.

The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment method but it doesn't work as I expect.

Let's say there is Kafka listener component:

@Component
public class SampleKafkaConsumer {

    @KafkaListener(
        topics = "${kafka.listener.some-event.topic}",
        groupId = "${kafka.listener.some-event.group-id}",
    )
    void consume(final SomeEvent event) {
        // do sth
    }
}

And there is a base class for integration tests:

@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {

    @Autowired(required = true)
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

    void setup() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
            .stream()
            .forEach { ContainerTestUtils.waitForAssignment(it, 1) }
    }
}

And for listener there is assigned fixed group-id:

kafka.listener.some-event.group-id=test.some-event

Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec, the first test class works, but the second try of executing waitForAssignment() ends with an error:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

I was trying to guard waitForAssignment call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext so Spring context is cached, but I feel like something wrong happens with the container when running new test class.

Another thing is that when I set random group ID name:

kafka.listener.some-event.group-id=test.some-event-${random.uuid}

Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.

If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?

I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka.

The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment method but it doesn't work as I expect.

Let's say there is Kafka listener component:

@Component
public class SampleKafkaConsumer {

    @KafkaListener(
        topics = "${kafka.listener.some-event.topic}",
        groupId = "${kafka.listener.some-event.group-id}",
    )
    void consume(final SomeEvent event) {
        // do sth
    }
}

And there is a base class for integration tests:

@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {

    @Autowired(required = true)
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

    void setup() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
            .stream()
            .forEach { ContainerTestUtils.waitForAssignment(it, 1) }
    }
}

And for listener there is assigned fixed group-id:

kafka.listener.some-event.group-id=test.some-event

Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec, the first test class works, but the second try of executing waitForAssignment() ends with an error:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

I was trying to guard waitForAssignment call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext so Spring context is cached, but I feel like something wrong happens with the container when running new test class.

Another thing is that when I set random group ID name:

kafka.listener.some-event.group-id=test.some-event-${random.uuid}

Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.

If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?

Share Improve this question edited Nov 19, 2024 at 13:47 BartekN asked Nov 18, 2024 at 20:22 BartekNBartekN 2313 silver badges12 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I think the @DirtiesContext is your friend. The point is that when Spring application context is cached, all of its bean are active. Therefore when you run a new test there is high possibility that your partitions are stolen by the container in the other cached context. Just because all of them are connected to the same Kafka broker.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论