当前位置: 首页 > 知识库问答 >
问题:

CommitFailedException:无法完成提交,因为组已重新平衡

江天宇
2023-03-14

环境:Hadoop2.75.+Flink1.4+Kafka0.10

我已经建立了一个实时数据处理项目。我使用Flink表源API(Kafka010JsonTableSource)作为表源。从kafka获取数据,然后执行一个SQL,最后输出到一个kafka主题。这是一个清晰的流程,但是我在Flink集群上执行时遇到了异常,下面是我的主要代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000) 
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
    .forTopic(kafkaConfig.topic)
    .withKafkaProperties(props)
    .withSchema(dynamicJsonSchema)
    .withRowtimeAttribute(
         enventTimeFieldName,  
         new ExistingField(enventTimeFieldName),  
         new MyBoundedOutOfOrderTimestamps(100L)) 
    .build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))

我已经启用了检查点。第一次在flink上执行时,我只是遵循Consumer的默认配置。在Flink任务运行之后,我通过kafka shell命令(kafka-consumer-groups.sh)检查了偏移量,发现了一个奇怪的情况。根据shell命令的输出和Flink任务管理器的日志,我发现偏移量在几秒钟开始时提交成功,但后来我继续遇到许多异常,如下所示:

于是我根据上面的错误信息搜索解决方案,有人告诉我应该增加session.timeout.ms,然后我就照做了,但还是失败了。之后我尝试了以下多种组合配置来测试,kafka偏移量总是在开始的时候提交成功,但是后来提交的will失败了。我真的不知道解决它,你能帮我修好它吗?非常感谢!!!!!!

kafka使用者配置组合如下:{“PropertyKey”:“Session.Timeout.ms”,“PropertyValue”:“300000”},{“PropertyKey”:“Request.Timeout.ms”,“PropertyValue”:“505000”},{“PropertyKey”:“Auto.Commit.Interval.ms”,“PropertyValue”:“10000”},{“PropertyKey”:“Max.Poll.Records”,“PropertyValue”:“50”},{“PropertyKey”:“我希望您能帮助修复以上错误,非常感谢!!!


共有1个答案

贺浩壤
2023-03-14

我找到根本原因了。重新平衡错误总是发生的原因是两个消费者(一个是消费者输入数据,另一个是消费者输出数据)的组名是相同的。我怀疑仅仅一个协调器还不足以处理两个消费者的偏移量提交操作。当我更改了一个消费者的组名后,世界突然变得安静了。错误从未发生过。

 类似资料:
  • 我正在使用kafka 0.9.0.1代理和0.9.0.1消费者客户端。我的使用者实例正在使用处理时间小于1秒的记录。和其他主要配置 一小时一两次。每天消耗约60亿次事件。似乎偏移量只存储在主题“__consumer_offsets”的一个分区中。它还会增加特定代理的负载。 有人知道这些问题吗?

  • 今天,在我的Spring Boot和单实例Kafka应用程序中,我遇到了以下问题: CommitFailedException:无法完成提交,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减

  • 我使用的是Kafka0.10.2,现在面临一个CommitFailedException。如: 无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来

  • 有人能帮忙吗?原因和解决方法是什么? 此外,当我的1个kafka代理关闭时,我的kafka流应用程序没有连接到其他代理?我已经设置了

  • 说明 协议2.1中读取state=2,3的结账请求后,从业务系统完成结账,并提交结果到服务端 请求地址 http://api.dc78.cn/Api/cash_post_cash 请求方式 GET 请求参数 参数 参数名称 必填 描述 范例 id 请求编号 此编号为协议2.1中返回的结算单id bzid 结算业务单号 返回 {"status":1,"info":"提交成功"} 请求方式 INI 请

  • 秒付业务,下行接口收到cash-pay后(下行接口详见0.5),完成相应的结账业务流程,并上传确认支付订单处理完成。 请求参数说明 参数 描述 必填 示例值 类型 最大长度 action 接口参数组 是 object └action 需要调用的接口名称 是 cash_post_cash string get GET参数组,本组参数需要参与签名 是 object └id 支付单流水号payid 是