当前位置: 首页 > 知识库问答 >
问题:

嵌入式Kafka联调-消费者永远不会完成

傅阳
2023-03-14

我正在为一个简单的Spring Boot应用程序编写Kafka联调。该应用程序简单地发布到Kafka主题。

我正在使用一个嵌入式Kafka实例进行测试。当通过Intellij运行时,测试运行得非常好,但当我通过gradle运行时,测试失败。看起来好像锁存倒计时从未达到0,测试最终超时。

生产者配置:

public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap-address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> articleProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> articleKafkaTemplate() {
        return new KafkaTemplate<>(articleProducerFactory());
    }
}

制片人:

public class KafkaProducer {

    @Value(value = "kafka.topic-name")
    String topicName;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) throws KafkaPublishException {
        try {
            ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(topic, message);
           future.get();
        } catch (Exception e) {
            throw new KafkaPublishException(e.getMessage());
        }

    }

    public String getTopicName() {
        return topicName;
    }

消费者:

@Component
public class KafkaConsumerHelper {
    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {

        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    private void setPayload(String payload) {
        this.payload = payload;
    }
}

测试:

@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumerHelper consumer;

    @Value("${test.topic}")
    private String topic;


    @Test
    public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
            throws Exception {

        String message = createArticle();

        producer.sendMessage(message, topic);
        consumer.getLatch().await();

        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString(message));
    }

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      group-id: my-id
test:
  topic: embedded-test-topic
  partitions-number: 1
  replication-factor: 1

知道问题出在哪里吗?

共有1个答案

微生毅
2023-03-14

对于将来研究这个问题的人来说,我的问题是我没有正确使用@EmbeddedKafka

修复方法是将bootstrapServersProperty="spring.kafka.bootstrap-server"属性添加到@EmbeddedKafka注释中。

@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")

更多信息请参见Kafka文档。

 类似资料:
  • 我正在做一个Kafka的消费者计划。最近我们在PROD环境下进行了部署。在那里,我们面临以下问题: 我的理解是,当组协调器不可用并被重新发现时,心跳间隔(根据文档为3秒)过期,消费者被踢出组。这是正确的吗?。如果是这样的话,应该为这个工作做些什么呢?。如果我错了,请帮助我理解这个问题,并建议您有任何想法,以解决这个问题。如果需要,我可以分享代码。

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何

  • 关于KafkaConsumer(>=0.9),我在尝试实现满足自己需求的解决方案时遇到了一些严重的问题。 假设我有一个函数,它只能从一个Kafka主题中读取n条消息。 例如:-->获取主题中的下5条kafka消息。 所以,我有一个类似这样的循环。用实际正确的参数编辑。在本例中,使用者的max参数设置为1,因此实际循环只循环一次。不同的消费者(他们中的一些人迭代了许多消息)共享一个抽象的父亲(这一个

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。