当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka制片人在发布键控流时引发异常

梁丘翔
2023-03-14

我在编写从接收器子任务到输出Kafka主题的键控流时遇到了问题。

作业的形式为:source-

异常来自kafka生产者,并导致检查点超时:

>

FlinkKafkaException:未能向Kafka发送数据:自批创建以来,mytopic-11:120000毫秒的16条记录已过期

作业在出现上述异常的情况下进入崩溃循环,偶尔会在再次崩溃循环之前短暂恢复。我认为这里的问题是,我正在使用这些键来确定输出分区,这会导致每个扇出写入N个输出分区的P个sink子任务。理想情况下,每个子任务只能写入单个分区。

作业具有以下约束/属性:

1:一旦一个键被写入输出kafka主题分区,它需要在未来总是被写入同一个kafka分区

2: 接收器子任务并行性最初将等于输出分区的数量

3: 我将来应该能够在不违反#1的情况下增加并行性

4: 我永远不会向输出kafka主题添加新分区

如果并行性==分区,那么我相信FlinkFixedPartitioner将是一个很好的解决方案。但是,我认为它不会尊重原始密钥-

这里有没有一种技术可以用来满足这些约束条件?可能是对Kafka制作人的设置进行了调整,或者是对键控流进行分区的另一种方法,或者是其他什么?

共有1个答案

云昊阳
2023-03-14

您假设Flink使用的分区逻辑与Kafka使用的分区逻辑相同。完全有可能(我怀疑正在发生的情况),给定4个键A、B、C和D,Flink会将A和B发送到一个接收器实例,而C和D会发送到另一个接收器实例。Kafka可能使用不同的分区逻辑,将A和C发送到一个分区,而B和D写入另一个分区。

Flink似乎不想公开给定密钥的密钥组,但如果接收器的并行度与Kafka分区的数量相同,则应该能够在自定义Kafka分区逻辑中使用接收器实例的task\u id。这有点野蛮,但它应该做你想做的事。

当我进一步思考时,您也可以为Flink编写一个自定义分区器,它使用与Kafka主题的自定义分区器相同的逻辑。这将处理扩展到更多接收器实例的问题。

 类似资料:
  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 我有一个包含属性的对象,该属性是包含两个字符串属性的对象列表。我的目标是将此映射的键与另一个包含字符串属性的对象进行比较。 为了更清楚我的请求包含地图: 我的sharePointDriveResponse包含以下列: 在这个阶段,我实现了一些工作正常的东西: 我的问题如下: 在这里传输数据时是否可能引发异常 在示例中:我发送一个创建文档的请求,并传递一个不允许文档使用但允许其他内容类型使用的字段。

  • 问题内容: 我已经为此工作了几天,已经找到了几种解决方案,但是都没有一个非常简单或轻巧的解决方案。问题基本上是这样的:我们有一个由10台计算机组成的群集,每台计算机都在多线程ESB平台上运行相同的软件。我可以很轻松地处理同一台计算机上线程之间的并发问题,但是不同机器上同一数据上的并发又如何呢? 本质上,该软件接收请求,以通过Web服务将客户数据从一家公司传送到另一家公司。但是,客户可能存在或可能不

  • 我尝试了很多方法在片段视图中成功实现了onitemselectedlistener,但不断地得到一个致命的java异常。它始终指向代码中的同一行,用于为spinner设置onitemselectedlistener方法。 android studio logcat错误输出为: 07-21 13:55:06.544 17277-17277/com。vaibhavtech。indoreveg E/An

  • 我已经创建了一个解码器来处理客户端发送的字节。给你 并在客户端发送字节时抛出下一个错误 Io.netty.handler.codec.DecoderException:java.lang.IllegalArgumentException:MinimumReadableBytes:-603652096(预期:>=0)在io.netty.handler.codec.ReplayingDecoder.C

  • spring MVC中controllin异常流的良好实践是什么? 假设我有一个DAO类,它将对象保存到数据库中,但如果违反了某些规则,例如名称太长、年龄太低,则抛出异常, 现在,如果我想保存名称超过10的A,它应该抛出异常。 但是有一个dataManipulator对象 和控制器 我希望在不抛出异常的情况下保留控制器(我听说这是一个很好的做法)。 但我的问题是,在这种情况下,A\u Data\u