当前位置: 首页 > 知识库问答 >
问题:

kafka中的某些提交偏移调用失败

龚沛
2023-03-14

Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1

我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。

我在调用commit offset时在消费者端遇到问题。

2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)

kafka服务器端配置:

listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false

以下 Kafka 消费者的配置:

auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000

要创建消费者,我使用以下api:

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));

和提交调用

consumer.commitOffsets();

在从 Kafka 读取消息时,我们使用以下方法来处理超时

private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
    try
    {
        it.hasNext();
        return true;
    }
    catch (ConsumerTimeoutException e)
    {
        return false;
    }
}

这是必需的,因为我们希望仅在从kafka接收到的特定时间间隔或消息大小(字节)之后才开始处理。

相同的异常,即使在设置dual . commit . enabled = false consumer . time out . ms = 1000之后,其他设置仍保持旧配置。

更多细节:

对于0.8.2.1版本,我从未遇到过这样的问题。移动到0.10.0.1(客户端和服务器)后,开始获取此异常。

在处理/推送到hadoop之前,我们正在阅读多条消息。处理/写入hadoop部分需要时间(约5分钟)。在这一过程之后,当我们试图推动时,我们就超越了例外。这个例外情况我每第二个commitOffset都会遇到。有时(其中commitOffset在前一次提交的10秒内调用),第二次提交没有例外。

以供参考。如果提交偏移失败,那么消费者只需读取下一条消息,而不返回上一个成功的提交偏移位置。但如果提交偏移失败并重新启动使用者进程,则它将从旧的提交位置读取。

共有1个答案

屈俊远
2023-03-14

正如我在问题细节中提到的,我正在使用最新的kafka jars,但仍然使用旧的消费者客户端:

kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));

我通过调用第二次commitOffset解决了这个问题。

实际上,与connections.max.idle.ms.这个属性相关的问题是用最新的kafka引入的(经纪人=10分钟,消费者=9分钟,生产者=9分钟)。

因此,每当我的老用户在10分钟后调用第二次提交偏移时,我都会出现上述异常。

使用旧的使用者API,无法设置此属性。和我无法更改的代理配置(由其他团队处理并为其他用户提供相同的代理)。。。

在这里,我认为旧的commitOffset调用需要另一个连接(而不是迭代器),当理想状态超过10分钟时,该连接将会关闭。我对此不是很确定。

如果第一次commitOffset调用发生任何失败,则第二次调用将确保成功。如果第一次执行成功,那么下一次执行不会有任何问题。无论如何,我们很少调用提交偏移。

接下来,我将使用最新的kafka消费者和生产者javaAPI移动代码。

 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我正试着把我的头绕在Kafka的交易上,而且只绕了一次。 我已经创建了一个事务性消费者,我想确保阅读和处理某个主题的所有消息。如果事务失败,消息因此丢失,Kafka仍会提交偏移量。 更正式地说,如果流处理应用程序使用消息A并生成消息B,使得B=F(A),那么恰好一次处理意味着当且仅当成功生成B时才认为A被消耗,反之亦然。来源 基于此,我假设消息A没有被消费,因此将再次被重新处理。但这条信息将如何重

  • 我正在使用spring with Kafka来消费来自Kafka主题的数据。我已经将并发配置为10。因此不同的线程轮询代理以获取消息并处理消息。即使在一段时间后(成功处理),我们也会收到相同的消息返回给使用者的不同线程。我们能够在配置的max.poll.interval.ms=1500000内处理接收到的消息。 请找到以下配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。 你能帮我解

  • 我正在设计一个ApacheStorm拓扑(使用streamparse),它由一个喷口(ApacheKafka喷口)和一个具有并行性的螺栓构建 螺栓分批读取信息。如果批量成功完成,我手动提交apache kafka偏移。 当mysql上的螺栓插入失败时,我不会在Kafka中提交偏移量,但是一些消息已经在喷口发送到螺栓的消息队列中。 应该删除队列中已经存在的消息,因为我无法在不丢失先前失败消息的情况下