当前位置: 首页 > 知识库问答 >
问题:

spring-kafka中的并行处理和自动缩放

商振
2023-03-14
    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }
    null

共有1个答案

乌杰
2023-03-14

您可以设置concurrency属性来运行更多线程;但是每个分区只能由一个线程处理。要提高并发性,必须增加每个主题中的分区数。在同一个侦听器中侦听多个主题时,如果这些主题只有一个分区,则可能无法获得所需的并发性,除非更改kafka使用者分区分配器。

参见https://docs.spring.io/spring-kafka/docs/2.5.0.release/reference/html/#using-concurrentmessageListenercontainer

在监听多个主题时,默认的分区分布可能不是您所期望的。例如,如果您有三个主题,每个主题有五个分区,并且您希望使用concurrency=15,那么您只看到五个活动使用者,每个使用者从每个主题分配了一个分区,而其他10个使用者处于空闲状态。这是因为默认的Kafka PartitionAssignor是RangeAssignor(参见其Javadoc)。对于这种情况,您可能希望考虑使用RoundRobinAssignor,它将分区分布在所有使用者之间。然后,为每个使用者分配一个主题或分区。...

 类似资料:
  • 那么有没有?或者javadoc有误导性(或者“动态”是什么意思)?如果没有,有没有这个功能的计划?

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅

  • 我正在使用spring批处理读取CSV文件并使用controller触发器将其写入DB。在启动应用程序时,在我从浏览器url中点击之前,我会在启动时看到来自阅读器的打印语句。虽然它不为我的处理器或写入器打印它,它们是在单独的类中,我已经自动连线。是因为读者是豆子吗?

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

  • 但是,这个解决方案的问题是它是不可伸缩的。我必须从一开始就知道我必须有多少节点和线程,因为以后不可能更改它:如果我增加并重新启动处理器,什么也不会发生。更糟糕的是,如果我减少了段计数:对于“删除”的段来说,事件永远不会被处理! 理想情况下,它应该能够指定每个节点应该使用的线程数。之后,当新的节点添加到处理器中时,段的数量应该相应地扩大。类似地,如果我移除节点,段的数量应该会减少。这可能与轴突,或它