@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
您可以设置concurrency
属性来运行更多线程;但是每个分区只能由一个线程处理。要提高并发性,必须增加每个主题中的分区数。在同一个侦听器中侦听多个主题时,如果这些主题只有一个分区,则可能无法获得所需的并发性,除非更改kafka使用者分区分配器。
参见https://docs.spring.io/spring-kafka/docs/2.5.0.release/reference/html/#using-concurrentmessageListenercontainer
在监听多个主题时,默认的分区分布可能不是您所期望的。例如,如果您有三个主题,每个主题有五个分区,并且您希望使用concurrency=15,那么您只看到五个活动使用者,每个使用者从每个主题分配了一个分区,而其他10个使用者处于空闲状态。这是因为默认的Kafka PartitionAssignor是RangeAssignor(参见其Javadoc)。对于这种情况,您可能希望考虑使用RoundRobinAssignor,它将分区分布在所有使用者之间。然后,为每个使用者分配一个主题或分区。...
那么有没有?或者javadoc有误导性(或者“动态”是什么意思)?如果没有,有没有这个功能的计划?
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?
我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅
我正在使用spring批处理读取CSV文件并使用controller触发器将其写入DB。在启动应用程序时,在我从浏览器url中点击之前,我会在启动时看到来自阅读器的打印语句。虽然它不为我的处理器或写入器打印它,它们是在单独的类中,我已经自动连线。是因为读者是豆子吗?
我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(
考虑一个阶跃豆: 要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。 在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行