当前位置: 首页 > 知识库问答 >
问题:

如何在kafka 0.9.0中使用多线程消费者?

淳于健
2023-03-14

Kafka的doc给出了一种方法,大约用以下描述:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if (!closed.get()) throw e;
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    public static void main(String[] args) {
        CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
                .withBootstrapServers("172.31.1.159:9092")
                .withGroupId("test")
                .build();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
        executorService.shutdown();
    }
}

但它不起作用,引发了一个异常:

JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全

此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我的类似。

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
    ConsumerRecords<byte[], byte[]> records;
    //noinspection SynchronizeOnNonFinalField
    synchronized (flinkKafkaConsumer.consumer) {
        try {
            records = flinkKafkaConsumer.consumer.poll(pollTimeout);
        } catch (WakeupException we) {
            if (running) {
                throw we;
            }
            // leave loop
            continue;
        }
    }

多线程的flink代码

发生了什么?

共有3个答案

濮阳靖
2023-03-14

也许不是你的情况,但如果你正在处理多个主题的数据,那么你可以用同一个消费者从多个主题中读取数据。如果不是这样,那么最好创建独立的工作岗位来处理每个主题。

司空高义
2023-03-14

它是抛出异常在您的呼叫订阅。this.consumer.subscribe(topicName);

将该块移动到同步块中,如下所示:

@Override
public void run() {
    try {
        synchronized (consumer) {
            this.consumer.subscribe(topicName);
        }
        ConsumerRecords<String, String> records;
        while (!closed.get()) {
            synchronized (consumer) {
                records = consumer.poll(100);
            }
            for (ConsumerRecord<String, String> tmp : records) {
                System.out.println(tmp.value());
            }
        }
    } catch (WakeupException e) {
        // Ignore exception if closing
        System.out.println(e);
        //if (!closed.get()) throw e;
    }
}
那博瀚
2023-03-14

Kafka消费者不是线程安全的。正如你在问题中指出的,文件中说

一个简单的选择是给每个线程自己的消费者实例

但在代码中,相同的消费者实例由不同的KafkanconSumerRunner实例包装。因此,多个线程正在访问同一个使用者实例。Kafka的文件明确指出

Kafka消费者不是线程安全的。所有网络I/O都发生在发出调用的应用程序的线程中。用户有责任确保多线程访问正确同步。不同步的访问将导致ConcurrentModificationException。

这正是你收到的例外。

 类似资料:
  • 问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题

  • 我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。 但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容- > 实际上,我可以在部署池中定义多个实

  • 我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消

  • 我希望flink consumer stream中的每条消息都能使用flink kafka producer生成多条消息,每条消息都通过一个单独的线程指向Kafka中的某个主题。我正在用Scala编写程序,但用Java就可以了 类似这样: 因此,对于flink消费者中的每个输入,我希望使用多线程向其他队列生成10条消息。

  • 我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。 这是我的消费者配置: 由于某些原因,我在同一个应用程序中有多个使用者,如下所示。 尽管如此,根据关于“消费者线程安全”的合流文件 一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地

  • 关于破坏者,我有以下问题: 消费者(事件处理器)没有实现他们实现EventHandler的任何可调用或可运行接口,那么他们如何能够并行运行,因此,例如,我有一个disruptor实现,其中有这样一个菱形模式 其中c1到c3可以在p1之后并联工作,C4和C5在p1之后工作。 所以通常我会有这样的东西(P1和C1-C5是可运行/可调用的) 但是在Disruptor的情况下,我的事件处理程序都没有实现R