当前位置: 首页 > 知识库问答 >
问题:

Flink、Kafka和JDBC水槽

陆文斌
2023-03-14

我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html

Kafka消费者使用以下选项初始化:

properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

在集群上启用了检查点。

我想要实现的是保证将所有过滤后的数据保存到数据库中,即使数据库关闭了6个小时,或者在保存到数据库时出现编程错误并且需要更新、重新部署和重新启动作业。

要做到这一点,Kafka偏移的任何检查点都应该意味着

  1. 从Kafka读取的数据处于Flink操作符状态,等待过滤/传递到接收器,并将作为Flink操作符检查点的一部分进行检查,或

在查看JdbcSink的实现时,我发现它并没有真正保留任何将被检查点/恢复的内部状态——相反,它的检查点是对数据库的写入。现在,如果此写入在检查点期间失败,并且Kafka偏移量确实被保存,我将处于“丢失”数据的情况——从Kafka的后续读取将从已提交的偏移量恢复,并且当db写入失败时正在飞行的任何数据现在不再从Kafka读取,也不在db中。

那么,有没有办法停止推进Kafka偏移量,只要管道已满(Kafka-

共有1个答案

郭曾笑
2023-03-14

我可以看到3个选项:

  1. 使用您的Flink版本试用JDBC 1.13连接器。它很有可能只是工作。
  2. 如果这不能立即起作用,请检查是否可以将其反向移植到1.11。不应该有太多更改。
  3. 编写您自己的2阶段提交接收器,可以通过扩展TwoPhaseCONSinkFunction或使用Checkpoint edFunctionCheckpoint Listener实现您自己的SinkFunction。基本上,您在成功的检查点后创建一个新事务,并使用通知检查点完成提交它。
 类似资料:
  • 在Flink中,我发现了2种设置水印的方法, 第一个是 第二个是 我想知道哪个最终会生效。

  • 我正在试用Kafka和Flink: 我使用flink制作人向Kafka发送推特流 如果我创建一个基本的RESTWebServices,我想我会失去流媒体的兴趣,对吗? 我应该向我的网络应用程序提供flink数据,还是应该将其发送到另一个Kafka主题,以便将其提供给网络应用程序? 非常感谢。 安托万

  • 我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00

  • Flink源函数引入水印,这些水印向下传递给下游操作符,根据这些操作符可以执行不同的基于时间的操作。对于使用多个流的操作员,将传入水印的最小值视为此时操作员的水印。 将源流拆分为多个逻辑流,然后将这些逻辑流传递给下游操作员(例如处理函数)。 Eg. 假设Process函数有4个子任务(例如),并且有100个关键组(假设),每个子任务处理25个关键组,即,等等。 如果从下午5点开始DriverStr

  • 我正在尝试编写一个Flink应用程序,它从Kafka读取事件,从MySQL丰富这些事件并将这些数据写入HBase。我正在中进行MySQL丰富,我现在正在尝试弄清楚如何最好地写入HBase。我想批量写入HBase,所以我目前正在考虑使用,后跟标识(仅返回),然后编写,它获取记录列表并批处理放入。 这是正确的做事方式吗?仅仅为了进行基于时间的缓冲而使用所有窗口和应用窗口感觉很奇怪。

  • 我是JDBC java编程的新手。尝试使用OCI驱动程序连接到oracle数据库(10.2)时,出现以下错误: C:\Java实践\JDBC 我已将CLASSPATH设置为ojdbc14.jaroracle_home和TNS_ADMIN也设置为指向tnsnames.ora文件。以下是JDBC URL: DriverManager.getConnection(“jdbc:oracle:oci:@XE