当前位置: 首页 > 知识库问答 >
问题:

KafkaConsumer在KafkaServer上出错(版本0.9.0.1)

芮承运
2023-03-14

我试图用Kafka客户端库(0.9.0.1)测试生产者和消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了KafkaProducer,没有问题。但当我测试KafkanConsumer进行轮询时,代理会发出一条错误消息。

[2016-03-18 13:44:19,129]错误关闭/172.26.132.149的套接字,因为错误(kafka.network.处理器)kafka.common.KafkaException:错误的请求类型10在kafka.api.Request estKeys$. desializerForKey(Request estKeys.scala:57)在kafka.network.Request estChannel$Request.(Request estChannel.scala:53)在kafka.network.Processor.read(SocketServer.scala:353)在kafka.network.Processor.run(SocketServer.scala:245)

消费者测试代码如下。

class ConsumerRunner implements Runnable{
    private KafkaConsumer<String,String> consumer;
    private String topic;
    public ConsumerRunner(String topic,Properties props){
        consumer = new KafkaConsumer<String,String>(props);
        this.topic = topic;
        consumer.subscribe(Arrays.asList(this.topic));
    }
    public void run() {
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(10000);
            for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        }
    }

}

我猜轮询请求包含错误的请求类型键,但当我检查Kafka核心源时,我重新确认请求类型键“10”定义为“GroupCoordinatory”。我在Kafka身上发现了可疑代码。网络请求通道。scala'

   val requestObj =
      if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
        RequestKeys.deserializerForKey(requestId)(buffer)

      else
        null

测试使用者还显示错误消息

lient.clientEOFException: null atlient.java:320NetworkRorg.apache.kafka.clients.consumer.internals.FromReadableChannel(NetworkRlient.poll)atlient.java:213NetworkRorg.apache.kafka.clients.consumer.internals.from(NetworkRlient.poll)atlient.java:193KafkaCorg.apache.kafka.clients.consumer.internals.(KafkaClient.poll)atlient.java:163KafkaCorg.apache.kafka.clients.consumer.internals.(KafkaCoordinator.ensure)atorg.apache.kafka.common.network.Selector.poll(Selector.java:286)atorg.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)atorg.apache.kafka.clients.consumer.internals.消费者网络Cjava.io.投票(消费者网络Corg.apache.kafka.common.network.)ateceive.read消费者网络Ceceive.java:83(消费者网络Corg.apache.kafka.common.network.)ateceive.read消费者网络Ceceive.java:71(消费者网络Corg.apache.kafka.common.network.)athannel.receive消费者网络hannel.java:153(消费者网络Corg.apache.kafka.common.network.)athannel.readAbstractChannel.java:134协调器已知(Abstract协调器. java: 180)at org. apache. kafka. clients

有人有主意吗?是我的问题吗?还是其他人?请帮帮我。谢谢你。

共有1个答案

潘修为
2023-03-14

Minkoo公司

不确定您只是试图为0.9 Kafka代码创建一个消费者,还是您的kafka消息有导致此问题的特定内容,您可以分享更多详细信息吗?

但是,如果您只是想为0.9编写一个Kafka消费者,那么在Kafka0.9中就有了新的消费者API。如果您愿意使用新的消费者API,请查看此示例https://github.com/sdpatil/KafkaAPIClient/blob/master/src/main/java/com/spnotes/kafka/simple/Consumer.java用于样品。

苏尼尔

 类似资料:
  • 我正在部署Solr(4.10)。war on Websphere 8.5.5 对于web.xml中配置的所有类,我得到了“错误的主要版本”错误。 下面是类的错误日志:RedirectServlet _defineClassWebApp通用初始化完成SRVE0266E:初始化servlet时发生错误:{0}java.lang.不支持ClassVersionError: JVMCFRE003坏的主要版

  • 问题内容: 我在不同的android版本中上传图片时遇到问题。我需要为php服务器发送图像,所以我正在使用网络服务。我用Froyo e Jelly Beans的版本进行了测试,它们可以工作,但KitKat不能工作。我在阅读有关MediaStore的文章时,我看到了不同的方式,但不管是对还是错。 我调试了项目,然后发现,在KitKat中,路径为NULL,Logcat在KitKat中告诉我“ Java

  • 当我尝试通过MySQL工作台从本地服务器远程导出数据库时, 我收到一些以下版本错误: mysqldump版本不匹配[内容]mysqldump。exe的版本为5.5.16,但要转储的MySQL Server的版本为5.6.10-log。由于mysqldump的版本比服务器旧,因此可能无法正确备份某些功能。建议您将本地MySQL客户端程序(包括mysqldump)升级到等于或高于目标服务器的版本。然后

  • 在我使用eclipse ant和java 1.8在Windows上编译代码之前,它完全正常工作。 现在我在Linux上使用编译。我确保下载了jdk 1.8和ant。 然而,当我运行ant启动构建时。xml,它仍然在失败,因为javac似乎出了问题。还有什么我需要安排的吗? 当我做ant-v时