当前位置: 首页 > 知识库问答 >
问题:

Spring批处理Kafka消费者对于多线程访问不安全

乐修远
2023-03-14

我是一个新手。我有需要读取Kafka流和过滤数据的要求

这是Spring批处理配置。

    @Autowired
    TaskExecutor taskExecutor;

    @Autowired
    JobRepository jobRepository;

    @Bean
    KafkaItemReader<Long, Event> kafkaItemReader() {
        Properties props = new Properties();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.putAll(this.properties.buildConsumerProperties());
        return new KafkaItemReaderBuilder<Long, Event>()
                .partitions(0)
                .consumerProperties(props)
                .name("event-reader")
                .saveState(true)
                .topic(topicName)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

    @Bean(name = "JobLauncher")
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

并且存在开始新作业的控制器endpoint。这是我必须使用的方式:开始新工作

    @Autowired
    @Qualifier("JobLauncher")
    private JobLauncher jobLauncher;


    Map<String, JobParameter> items = new HashMap<>();
    items.put("userId", new JobParameter("UserInputId"));
    JobParameters paramaters = new JobParameters(items);
    try {
        jobLauncher.run(job, paramaters);
    } catch (Exception e) {
        e.printStackTrace();
    }

我已经看到KafkaItemReader不是线程安全的。我想知道这种方法是正确的,还是有任何方法可以在多线程spring批处理环境中读取kafka流。谢谢

共有2个答案

杜俭
2023-03-14

根据spring文档,它使用KafkaConsumer;根据其详细文档,其本身不是线程安全的。

请查看您是否可以使用该文档中提到的任何方法(即解耦或每个线程使用一个消费者)。在您的示例中,您可能需要为taskexecutor使用单独的处理程序(如果您遵循解耦方法)。

韦叶秋
2023-03-14

< code>KafkaItemReader被证明是非线程安全的,以下是其Javadoc的摘录:

Since KafkaConsumer is not thread-safe, this reader is not thread-safe.

所以在多线程环境中使用它是不正确的,并且不符合留档。您可以做的是每个分区使用一个读取器。

 类似资料:
  • 我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。

  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?

  • 我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。 这是我的消费者配置: 由于某些原因,我在同一个应用程序中有多个使用者,如下所示。 尽管如此,根据关于“消费者线程安全”的合流文件 一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这