当前位置: 首页 > 知识库问答 >
问题:

批量KafkaListener的消费者接收器

程和蔼
2023-03-14

我的消费者配置有kafka批处理侦听器配置和@KafkaListener消耗消息列表。我有一个消费者拦截器,我想为每条记录设置唯一ID,并将其值存储在映射诊断上下文(MDC)中。如果我的kafka侦听器消耗单个消息,则唯一ID是正确的。但是我的kafka侦听器消耗消息列表,因此MDC. get(“id”)只获取最后一个值。如何处理它?我的拦截器;

public class KafkaConsumerInterceptor implements ConsumerInterceptor<String, String>{

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        ConsumerRecord<String, String> record = consumerRecords.iterator().next();
        setId(record.headers().headers("id"));
        return consumerRecords;
    }

    private void setId(Iterable<Header> idHeader) {
        String id = UUID.randomUUID().toString();
        if (idHeader.iterator().hasNext()) {
            Header header = idHeader.iterator().next();
            id = new String(header.value(), StandardCharsets.UTF_8);
        }
        MDC.put("id", id);
    }
  }

共有1个答案

暨成双
2023-03-14

您需要移动MDC。将()。

 类似资料:
  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 高级使用者 API 似乎一次读取一条消息。 如果消费者想要处理这些消息并提交给其他下游消费者(如Solr或Elastic-Search ),这可能会给他们带来很大的问题,因为他们更喜欢批量接收消息,而不是一次接收一条。 在内存中批处理这些消息也并非易事,因为只有当批处理已经提交时,Kafka中的偏移量也需要同步,否则具有未提交下游消息的崩溃的 kafka 使用者(如在Solr或ES中)将已经更新其

  • 我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)

  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生