我是一个新手。我有需要读取Kafka流和过滤数据的要求
这是Spring批处理配置。
@Autowired
TaskExecutor taskExecutor;
@Autowired
JobRepository jobRepository;
@Bean
KafkaItemReader<Long, Event> kafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Long, Event>()
.partitions(0)
.consumerProperties(props)
.name("event-reader")
.saveState(true)
.topic(topicName)
.build();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean(name = "JobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
并且存在开始新作业的控制器endpoint。这是我必须使用的方式:开始新工作
@Autowired
@Qualifier("JobLauncher")
private JobLauncher jobLauncher;
Map<String, JobParameter> items = new HashMap<>();
items.put("userId", new JobParameter("UserInputId"));
JobParameters paramaters = new JobParameters(items);
try {
jobLauncher.run(job, paramaters);
} catch (Exception e) {
e.printStackTrace();
}
我已经看到KafkaItemReader不是线程安全的。我想知道这种方法是正确的,还是有任何方法可以在多线程spring批处理环境中读取kafka流。谢谢
根据spring文档,它使用KafkaConsumer;根据其详细文档,其本身不是线程安全的。
请查看您是否可以使用该文档中提到的任何方法(即解耦或每个线程使用一个消费者)。在您的示例中,您可能需要为taskexecutor使用单独的处理程序(如果您遵循解耦方法)。
< code>KafkaItemReader被证明是非线程安全的,以下是其Javadoc的摘录:
Since KafkaConsumer is not thread-safe, this reader is not thread-safe.
所以在多线程环境中使用它是不正确的,并且不符合留档。您可以做的是每个分区使用一个读取器。
我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。
问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?
我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。 这是我的消费者配置: 由于某些原因,我在同一个应用程序中有多个使用者,如下所示。 尽管如此,根据关于“消费者线程安全”的合流文件 一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地
null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?
我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出
想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这