当前位置: 首页 > 知识库问答 >
问题:

如何从分区分配器的实现中调用kafkaconsumer api

蔚丰
2023-03-14

我已经通过在我的spring boot应用程序中实现RangeAssignor实现了我自己的分区分配策略。我已经覆盖了它的subscriptionUserData方法并添加了一些用户数据。每当数据发生变化时,我都希望通过调用下面的kafkaconsumer apis来触发分区重新平衡kafkaconsumer apis强制重新平衡

我不确定如何获取 kafka 使用者的对象并调用此 api。请建议

共有1个答案

漆雕修德
2023-03-14

您可以调用consumer.wakeup()函数

consumer.wakeup()是唯一可以安全地从不同html" target="_blank">线程调用的消费者方法。调用wakeup将导致轮询()与WakeupException一起退出,或者如果线程没有等待轮询时调用了consumer.wakeup(),则在调用轮询()时将在下一次迭代中引发异常。不需要处理WakeupException,但在退出线程之前,您必须调用consumer.close()。如果需要,关闭消费者将提交偏移集,并将向组协调器发送消费者正在离开组的消息。消费者协调器将立即触发重新平衡

  Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            System.out.println("Starting exit...");
            consumer.wakeup();   **//1**
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
} });
    ...
    Duration timeout = Duration.ofMillis(100);
    try {
        // looping until ctrl-c, the shutdown hook will cleanup on exit
        while (true) {
            ConsumerRecords<String, String> records =
                movingAvg.consumer.poll(timeout);
            System.out.println(System.currentTimeMillis() +
                "--  waiting for data...");
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
            }
            for (TopicPartition tp: consumer.assignment())
                System.out.println("Committing offset at position:" +
                    consumer.position(tp));
                movingAvg.consumer.commitSync();
        }
    } catch (WakeupException e) {
        // ignore for shutdown. **//2**
    } finally {
        consumer.close(); **//3**
        System.out.println("Closed consumer and we are done");
    }
  1. ShutdownHook在一个单独的线程中运行,因此我们可以采取的唯一安全操作是调用wakeup来打破轮询循环。
  2. 另一个调用wakeup的线程将导致轮询抛出WakeupException。您需要捕获异常以确保您的应用程序不会意外退出,但无需对其执行任何操作。
  3. 在退出消费者之前,请确保干净地关闭它。

完整示例:

https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java

 类似资料:
  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 问题内容: 我有一个使用Python的scikit- learn训练的分类器。如何使用Java程序中的分类器?我可以使用Jython吗?有什么方法可以在Python中保存分类器并在Java中加载分类器?还有其他使用方式吗? 问题答案: 您不能使用jython,因为scikit- learn非常依赖numpy和scipy,它们具有许多已编译的C和Fortran扩展,因此无法在jython中工作。 在

  • 我已经在c中创建了kafka消费者,并创建了一个具有10个分区的主题,当我尝试使用消费者读取数据时,它仅从2个分区读取,然后说没有更多的消息。我尝试使用这两种方法,即订阅和分配,但它们都不起作用。我应该如何将所有10个分区分配给单个使用者,这是将分区分配给使用者的正确方法吗?我已经使用此存储库构建了自定义消费者 https://github.com/edenhill/librdkafka/blob

  • 问题:我已经启动了五个elasticsearch节点,但只有66,84%的数据在kibana中可用。当我用localhost检查集群运行状况时:9200/u cluster/health?pretty=true我得到了以下信息: 除kibana指数外,我所有的指数都是红色的。 小部分:

  • 我正在编写一个记录服务,它可以适当地获取意图和功能中的附加数据。我使用START\u REDELIVER\u INTENT,以便在每个START命令上使用与原始意图相同的设置开始录制,但我必须知道当前调用onStartCommand是由于显式startService还是进程崩溃。 因此,我的问题是如何区分启动服务请求导致的onStartCommand调用和服务崩溃\关闭后系统在没有显式停止自己\停

  • 查询示例: 典型错误消息: 处理语句时出错:失败:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapredTask返回代码2 问题2:当我运行命令?我是否只运行相同的命令,但使用STRING而不是bigint?**完整错误消息:**