当前位置: 首页 > 面试题库 >

Kafka Consumer挂在Java中的.hasNext

廖令
2023-03-14
问题内容

我在Java中有一个简单的Kafka Consumer,带有以下代码

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()&& !done){
            try {
                System.out.println("Parsing data");
                byte[] data = it.next().message();
                System.out.println("Found data: "+data);
                values.add(data); // array list
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        done = true;
    }

发布消息后,将成功读取数据,但是当它返回检查它时。hasNext(),它将保持待处理状态,再也不会返回。

什么会拖延这个?

m_stream是通过以下方式获得的KafkaStream:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
   // m_stream is one of these streams
}

问题答案:

解决方案是添加属性

“ consumer.timeout.ms”

现在,当达到超时时,将引发ConsumerTimeoutException



 类似资料:
  • :) 我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。 首先这可能吗? 我一直在尝试这样的事情: 但是我得到了这个: 任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢

  • 如果我需要使用特定消费组的最新提交偏移量(用于从Spark结构化流开始偏移),我应该使用什么。 我的代码显示已弃用。 官方文件: 偏移量和使用者位置Kafka为分区中的每个记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并将接下来接收偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位

  • committed:获取给定分区的上次提交偏移量(无论提交是由这个进程还是另一个进程进行的)。此偏移量将在发生故障时用作消费者的位置。此调用将阻止执行远程调用,以从服务器获取最新提交的偏移量。 这是否意味着如果consumer.poll()获取了50条从偏移量101到150的消息,并且consumer有手动偏移量提交。并且使用者仍在处理这50条消息,所以最后提交的偏移量是100。现在committ

  • 问题内容: 首先,我是Java的新手,并试图完成学校创建自动售货机的作业。我的程序将2个文件用作cli参数,一个用于产品,另一个用于金钱。 为了我的一生,我无法弄清楚代码为什么挂在第42行上 我尝试使用断点在eclipse上进行调试,并注意到代码永远不会超过这一行。将print语句放入while循环内,我没有得到输出,所以我知道它没有循环。 Java文档说hasNextLine可以阻止等待用户输入

  • 我试图用Kafka客户端库(0.9.0.1)测试生产者和消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了KafkaProducer,没有问题。但当我测试KafkanConsumer进行轮询时,代理会发出一条错误消息。 [2016-03-18 13:44:19,129]错误关闭/172.26.132.149的套接字,因为错误(kafka.network.处理器)kafka.common.

  • 我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。 这是我写的 然后我写了一个REST服务来恢复消费者 现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1