当前位置: 首页 > 知识库问答 >
问题:

我如何同时处理ReactorKafka流的主题和分区与自动确认?

宰父正真
2023-03-14

我试图实现Kafka主题分区的并发处理使用反应堆Kafka与自动确认。这里的文档使它看起来像是可能的:

http://projectreactor.io/docs/kafka/militare/reference/#并发订购

这与我尝试的唯一不同之处在于我使用的是自动确认。

public class KafkaFluxFactory<K, V> {

    private final Map<String, Object> properties;

    public KafkaFluxFactory(Map<String, Object> properties) {
        this.properties = properties;
    }

    public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
        return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
            .receiveAutoAck()
            .flatMap(flux -> flux.groupBy(this::extractTopicPartition))
            .flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
    }

    private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
        return new TopicPartition(record.topic(), record.partition());
    }
}

对我做错了什么有什么想法吗?

共有1个答案

陆子石
2023-03-14

经过多次反复试验,再加上重新考虑我想要完成的任务,我意识到我试图在一段代码中解决两个问题。

我需要的两件事是:

  1. Kafka分区的顺序处理
  2. 并行处理每个分区的能力

在试图用这段代码解决这两个问题时,我限制了下游用户配置并行化级别的能力。因此,我更改了该方法,以返回一个GroupedFluxes通量,它为下游用户提供正确的粒度,以确定什么是可并行化的:

public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAuto(Collection<String> topics) {
    return KafkaReceiver.create(createReceiverOptions(topics))
        .receiveAutoAck()
        .flatMap(flux -> flux.groupBy(this::extractTopicPartition));
}

在下游,用户可以使用他们希望的任何调度程序来并行化每个发出的GroupedFlux:

public <V> void work(Flux<GroupedFlux<TopicPartition, V>> flux) {
    flux.doOnNext(groupPublisher -> groupPublisher
            .publishOn(Schedulers.elastic())
            .subscribe(this::doWork))
        .subscribe();
}

这具有所需的行为,按顺序并与其他GroupedFlux并行处理每个TopicPartition-GroupedFlux。

 类似资料:
  • 下面是从kafka主题(8分区)接收消息并对其进行处理的消费者代码。 如果处理逻辑中没有返回错误,则所有工作都按预期进行。 但是,如果我抛出一个错误来模拟特定消息的处理逻辑中的异常行为,那么我就没有处理导致异常的消息。流将移动到下一条消息。 我想要实现的是,处理当前消息并提交偏移量,如果它成功,然后移动到下一个记录。 问候, 维诺特

  • 我开始学习Kafka用于企业解决方案。 在我阅读的过程中,我脑海中浮现出一些问题: > 当一个生产者正在生成一个消息--它会指定它想要将消息发送到的主题,是这样吗?它关心分区吗? 当订阅服务器运行时-它是否指定其组id,以便它可以是同一主题的使用者集群的一部分,或者是该组使用者感兴趣的几个主题的一部分? 每个消费者组在代理上有一个对应的分区还是每个消费者都有一个? 分区是由代理创建的,因此不是消费

  • 我想知道Kafka流是如何分配到主题的分区进行阅读的。据我所知,每个Kafka流线程都是一个消费者(该流有一个消费者组)。所以我猜消费者是随机分配到分区的。 话题P包含人称。它有两个分区。消息的关键是person ID,因此每个属于person的消息最终都位于同一个分区中。 主题O包含订单。它有两个分区。假设密钥也是(订购某样东西的人的)person-id。因此,在这里,属于一个人的每个订单消息总

  • 我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。

  • 问题内容: 我想派生一个go进程并获取新进程的ID,但是我在或库中看到的只是启动一个新进程。 问题答案: 您应该从包装中获取。 请注意,这是在根本不使用任何线程的情况下发明的,并且一个进程中始终只有一个执行线程,因此分叉是安全的。使用Go,情况完全不同,因为它大量使用OS级线程来为其goroutine调度提供动力。 现在,在Linux上未经修饰的子进程将在所有活动线程中只有一个线程(在父进程中调用