当前位置: 首页 > 知识库问答 >
问题:

当检查点被禁用时,Kafka010JsonTableSource将不会自动提交kafka偏移量

陆阳曜
2023-03-14

我已经设置了一个本地的Kafka0.10+Flink1.4环境。

我使用下面的代码来消费来自Kafka主题的数据:

val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(enventTimeFieldName,new ExistingField(enventTimeFieldName), new BoundedOutOfOrderTimestamps(30000L))
.build() 

tableEnv.registerTableSource(tableName, tableSource)

val tableResult:Table = tableEnv.sqlQuery(sql)

在我执行这段代码后,总是会发现警告消息:

自动提交组TaxidataGroup的偏移量{taxidata-0=offsetandMetadata{offset=728461,Metadata=“}}失败:无法完成提交,因为该组已重新平衡并将分区分配给其他成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。

无论我在Kafka中设置了什么属性,它总是显示上面的警告消息。

  {
    "propertyKey": "enable.auto.commit",
    "propertyValue": "true"
  },
  {
    "propertyKey": "session.timeout.ms",
    "propertyValue": "250000"
  },
  {
    "propertyKey": "request.timeout.ms",
    "propertyValue": "305000"
  },
  {
    "propertyKey": "auto.commit.interval.ms",
    "propertyValue": "800000"
  },
  {
    "propertyKey": "max.poll.records",
    "propertyValue": "300"
  },
  {
    "propertyKey": "max.poll.interval.ms",
    "propertyValue": "300000"
  }

我不确定如果Kafka010JsonTableSource flink1.4会自动提交偏移量。但测试结果表明它不会自动提交偏移量。有谁能帮忙证实一下这个问题吗?或者你能在我的代码中看到任何其他问题吗?

共有1个答案

段干长恨
2023-03-14

是否尝试设置低于代理的group.max.session.timeout.ms值的session.timeout.ms值?根据https://github.com/dpkp/kafka-python/issues/746,这似乎是问题所在。

 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我使用的是Spring Kafka 1.2.2版。我有一个Kafka Listener作为消费者,它监听一个主题并在弹性中索引文档。我的自动提交偏移量属性设置为true//default。

  • 我试图使用Kafka Utils Api从Kafka(0.10.0.0)到Spark(1.6.0)流媒体应用程序使用数据 Kafka提尔。createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,InputOpicSet) 要求是将偏移范围手动提交给Kafka本身。 请注意,当在java中使用Kafk

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 简而言之,我想从一开始就对Kafka的数据重新运行Flink管道。 Flink0.10.2,Kafka0.8.2。 我在Kafka中有一个保留2小时的推文主题,以及Flink中的一个管道,该管道以每10秒5分钟的滑动窗口计算推文。 如果我中断管道并重新运行它,我希望它重新读取旧推文,从而发出价值5分钟的推文计数。相反,它似乎从新到达的推文重新开始,因此需要5分钟才能计数为“处于状态”。 我已经尝试

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?