当前位置: 首页 > 知识库问答 >
问题:

斯普林斯·Kafka--试图理解幕后的事情是如何运作的

柳珂
2023-03-14

考虑以下代码-

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress);
    props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      groupId);
    props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

我已经创建了一个消费者工厂和一个concurrentKafkaListenercontainer工厂。我还没有为侦听器工厂设置并发性。我有一个用@kafkalistener注释的方法

@KafkaListener(topics = "topicName")
public void listen(String message) {
    System.out.println("Received Message: " + message);

此外,根据并发性的不同,假设我们现在只监听一个主题,我们将有3个用@kafkalistener注释的方法,如果没有指定分区,所有3个方法都监听不同的分区(由kafka以循环方式提供)。?

我对Kafka是新手,想澄清一下我的理解。

共有1个答案

拓拔弘亮
2023-03-14

当我不设置并发属性时,Spring会创建1个消费者实例、1个属于消费者工厂中指定的组的kafka侦听器容器吗?

您将有一个使用者从该主题的所有分区获取事件。

如果我将并发性更改为3,那么spring是否会创建3个使用者实例,即在配置使用者工厂和3个侦听器容器时指定的同一使用者组中的3个使用者?

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Messasge: " + message"
        + "from partition: " + partition);
}

这毫无意义。首先,KafkaListeners是Spring Kafka的高级抽象,Kafka根本没有循环(从消费者的角度来看,它与生产者不同),如果您有3个消费者(相同的消费者组+监听相同的主题),并且主题中有3个分区,Kafka将重新平衡并将一个分区分配给一个消费者,每个消费者将只从Kafka分配的分区中获取事件。Spring Kafka在每个消费者中接收事件后,将在KafkaListener实例中交付事件。

 类似资料:
  • 我是一个网络人,试图打造我的第一部Kafka-- 我还应该说,我的Kafka节点运行在我的主机Ubuntu机器上(不是在Docker容器中)。当我运行Kafka时,我也运行了一个JMX导出器的实例。下面是我在Ubuntu命令行上启动Kafka的方法: 好的。我的Prometheus(也是一个主机进程,不是Docker容器版本)可以成功地从我的Kafka中提取许多指标。所以我只需要想办法让普罗米修斯

  • 我正在按照教程创建跟踪应用程序zipkin和sleuth,但我遇到了一些问题。我无法创建跨度。问题是该方法不存在。此外,我找不到跟踪器的导入。 这就是我想要做的: 为什么上面的实现不起作用?

  • 安装VS 2015后,从命令行运行csc.exe会导致此消息显示到控制台: 此编译器是作为Microsoft(R).NET Framework的一部分提供的,但仅支持C#5之前的语言版本,该版本已不再是最新版本。有关支持C#编程语言更新版本的编译器,请参见http://go.microsoft.com/fwlink/?LinkID=533240 该链接重定向到Roslyn在GitHub的存储库。<

  • 我试图通过扩展< code > Cassandra auto configuration 在我的应用程序中配置Cassandra 我使用spring < code > Cassandra repository 进行数据库访问,并使用带有< code > o . s . d . Cassandra . core . mapping . table 注释的类来定义我的表。 我还配置了以下属性以及集群所

  • 本文向大家介绍什么是赛普拉斯的自动化测试?,包括了什么是赛普拉斯的自动化测试?的使用技巧和注意事项,需要的朋友参考一下 赛普拉斯是用于测试前端现代Web应用程序的未来工具。它旨在克服工程师和开发人员在测试基于React和AngularJS的Web应用程序时面临的障碍。它是一种快速,轻松,可靠的工具,用于测试在浏览器上运行的任何应用程序。 赛普拉斯通常与硒进行比较。但是赛普拉斯和Selenium在架

  • 10mins左右 线上 1.自我介绍 2.项目介绍和负责的工作 3.简历细节确认 4.公众号更多还是短视频更多 5.数据分析和涨粉的关系 6.是否投豆荚 7.发一下作品集 8.反问 面试体验有点迷迷,可能和企业性质有关,面试业务部门的专业水平未知。 同时,面试官也迟到了一会,好像这个面试有一些微妙的尴尬。