当前位置: 首页 > 知识库问答 >
问题:

HDFS上带有查找数据的Kafka流

单于煌
2023-03-14

我正在用Kafka Streams(V0.10.0.1)编写一个应用程序,并希望用查找数据丰富我正在处理的记录。该数据(带有时间戳的文件)每天(或每天2-3次)写入HDFS目录。

如何将其加载到Kafka Streams应用程序中并连接到实际的KStreams
当一个新文件到达HDFS时,从那里重新读取数据的最佳实践是什么?

或者切换到Kafka connect并将RDBMS表内容写入一个Kafka主题更好,该主题可以被所有Kafka Streams应用程序实例使用?

更新:
正如所建议的那样,Kafka连接将是一种可行的方式。因为RDBMS中的查找数据是每天更新的,所以我考虑将Kafka Connect作为一个计划好的一次性作业来运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销...等等。对我来说,在这种情况下有一个预定的提取看起来更安全。

查找数据不大,可能会删除/添加/修改记录。我也不知道我怎么能总是有一个充分的倾倒到一个Kafka的主题和截断以前的记录。启用日志压缩并为已删除的键发送空值可能不会起作用,因为我不知道源系统中删除了什么。另外,当压缩发生时,我没有html" target="_blank">控制。

共有1个答案

邓才
2023-03-14

推荐的方法确实是将查找数据也摄取到Kafka中--例如通过Kafka Connect--正如您在上面所建议的那样。

但是在这种情况下,我如何安排连接作业每天运行,而不是连续地从源表中提取,这在我的情况下是不必要的呢?

也许您可以更新您的问题您不希望有一个连续的Kafka连接作业运行?您是否关心资源消耗(DB上的负载),是否关心处理的语义(如果它不是“Daily UPDATEs”),或者...?

更新:正如建议的那样,Kafaka连接将是一条道路。因为RDBMS中的查找数据是每天更新的,所以我考虑将Kafka Connect作为一个计划好的一次性作业来运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销...等等。对我来说,在这种情况下有一个预定的提取看起来更安全。

Kafka Connect是安全的,构建JDBC连接器的目的正是为了以健壮、容错和性能良好的方式将DB表输入Kafka(已经有许多生产部署)。因此,我建议不要仅仅因为“批处理更新”看起来更安全,就退回到“批处理更新”模式;就我个人而言,我认为触发每日摄取在操作上不如仅仅保持其连续(和实时)运行那么方便摄取,并且它还会导致实际用例的几个缺点(请参见下一段)。

但当然,您的里程数可能会有所不同--所以如果您设定为每天只更新一次,那么就去做吧。但是您失去了a)在充实发生的时间点用最新的DB数据充实传入记录的能力,并且相反,b)实际上您可能在下一个每日更新完成之前用陈旧/旧的数据充实传入记录,这很可能会导致您向下游发送/提供给其他应用程序以供使用的错误数据。例如,如果一个客户更新了她的送货地址(在DB中),但您每天只向流处理应用程序(可能还有许多其他应用程序)提供一次该信息,那么订单处理应用程序将把包裹送到错误的地址,直到下一个每日摄取完成。

查找数据不大,可能会删除/添加/修改记录。我也不知道我怎么能总是有一个充分的倾倒到一个Kafaka的主题和截断以前的记录。启用日志压缩并为已删除的键发送空值可能不会起作用,因为我不知道源系统中删除了什么。

Kafka Connect的JDBC连接器已经为您自动处理了:1。它确保数据库的插入/更新/删除正确地反映在Kafka主题中。Kafaka的日志压缩确保了目标主题不会超出界限。您可能需要阅读文档中的JDBC连接器,以了解哪些功能是免费获得的:http://docs.confluent.io/current/connect/connect-jdbc/docs/?

 类似资料:
  • 现在还不清楚你是否能像在《水槽》中那样在Kafka中做一个扇出(复制)。 我想让Kafka将数据保存到HDFS或S3,并将该数据的副本发送到Storm进行实时处理。Storm集合/分析的输出将存储在Cassandra中。我看到一些实现将所有数据从Kafka流到Storm,然后从Storm输出两个。但是,我想消除Storm对原始数据存储的依赖。 这可能吗?您知道任何类似的文档/示例/实现吗? 还有,

  • 我正在使用Kafka连接分布。命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties 工作人员配置为: Kafka连接重新开始没有错误! java代码如下: 奇怪的事情发生了。我从kafka-logs中获取数据,但在hdfs中没有数据(没有主题目录)。我尝试connector命令: 出什

  • 我在执行配置单元查询时遇到异常。我关注以下链接:http://www.thecloudavenue.com/2013/03/analysis-tweets-using-flume-hadoop-and.html 终端数据在这里:

  • 我一直在关注这篇关于如何用Hive分析twitter数据的文章:http://blog . cloud era . com/blog/2012/11/analyzing-Twitter-data-with-Hadoop-part-3-query-semi-structured-data-with-Hive/ 我设置了水槽来收集twitter数据并写入HDFS。我已经设置了一个指向同一HDFS位置的

  • 我正在尝试将kafka-jdbc连接器(源代码和接收器)与非常旧的数据库(cloudscape)一起使用。我有这个数据库的 JDBC 驱动程序。我将驱动程序放在Confluent(版本5)的“/share/java/kafka/connect/jdbc”文件夹中,并创建了属性文件。 启动连接器时,日志如下: 我想JDBC驱动程序很旧(它使用JAVA1.3)这一事实存在问题。驱动程序使用RMI协议进

  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误