我试图实现Kafka主题分区的并发处理使用反应堆Kafka与自动确认。这里的文档使它看起来像是可能的:
http://projectreactor.io/docs/kafka/militare/reference/#并发订购
这与我尝试的唯一不同之处在于我使用的是自动确认。
public class KafkaFluxFactory<K, V> {
private final Map<String, Object> properties;
public KafkaFluxFactory(Map<String, Object> properties) {
this.properties = properties;
}
public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
}
private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
return new TopicPartition(record.topic(), record.partition());
}
}
对我做错了什么有什么想法吗?
经过多次反复试验,再加上重新考虑我想要完成的任务,我意识到我试图在一段代码中解决两个问题。
我需要的两件事是:
在试图用这段代码解决这两个问题时,我限制了下游用户配置并行化级别的能力。因此,我更改了该方法,以返回一个GroupedFluxes通量,它为下游用户提供正确的粒度,以确定什么是可并行化的:
public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAuto(Collection<String> topics) {
return KafkaReceiver.create(createReceiverOptions(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition));
}
在下游,用户可以使用他们希望的任何调度程序来并行化每个发出的GroupedFlux:
public <V> void work(Flux<GroupedFlux<TopicPartition, V>> flux) {
flux.doOnNext(groupPublisher -> groupPublisher
.publishOn(Schedulers.elastic())
.subscribe(this::doWork))
.subscribe();
}
这具有所需的行为,按顺序并与其他GroupedFlux并行处理每个TopicPartition-GroupedFlux。
下面是从kafka主题(8分区)接收消息并对其进行处理的消费者代码。 如果处理逻辑中没有返回错误,则所有工作都按预期进行。 但是,如果我抛出一个错误来模拟特定消息的处理逻辑中的异常行为,那么我就没有处理导致异常的消息。流将移动到下一条消息。 我想要实现的是,处理当前消息并提交偏移量,如果它成功,然后移动到下一个记录。 问候, 维诺特
null 谢谢你的澄清。
我开始学习Kafka用于企业解决方案。 在我阅读的过程中,我脑海中浮现出一些问题: > 当一个生产者正在生成一个消息--它会指定它想要将消息发送到的主题,是这样吗?它关心分区吗? 当订阅服务器运行时-它是否指定其组id,以便它可以是同一主题的使用者集群的一部分,或者是该组使用者感兴趣的几个主题的一部分? 每个消费者组在代理上有一个对应的分区还是每个消费者都有一个? 分区是由代理创建的,因此不是消费
我想知道Kafka流是如何分配到主题的分区进行阅读的。据我所知,每个Kafka流线程都是一个消费者(该流有一个消费者组)。所以我猜消费者是随机分配到分区的。 话题P包含人称。它有两个分区。消息的关键是person ID,因此每个属于person的消息最终都位于同一个分区中。 主题O包含订单。它有两个分区。假设密钥也是(订购某样东西的人的)person-id。因此,在这里,属于一个人的每个订单消息总
我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。
问题内容: 我想派生一个go进程并获取新进程的ID,但是我在或库中看到的只是启动一个新进程。 问题答案: 您应该从包装中获取。 请注意,这是在根本不使用任何线程的情况下发明的,并且一个进程中始终只有一个执行线程,因此分叉是安全的。使用Go,情况完全不同,因为它大量使用OS级线程来为其goroutine调度提供动力。 现在,在Linux上未经修饰的子进程将在所有活动线程中只有一个线程(在父进程中调用