我想读取事务的元数据(在Kafka0.11.0.1中支持),这样我就可以确定特定事务ID的事务是否已经提交。目前,我正在从_transactional_state主题获取键和值,但它是某种编码格式。以下是我在轮询__transaction_state主题时收到的一些相同的键/值:键=10000000MMM,值=+')
您可以查看kafka/tools/dumplogsegments.scala
文件中的TransactionLogMessageParser
类的源代码作为示例。它使用TransactionLog
类中的ReadTxnRecordValue
函数。该函数的第一个参数可以通过同一类的readtxnrecordkey
函数检索。
下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc
我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也
嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写
有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取? 即使主题是在流应用程序已经运行时创建的? 类似于在主题名称中使用通配符,如下所示: 现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。 虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所
我们的生产Storm集群出现了一个我们无法解决的问题。 在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。 我们更改了使用者组 ID,并将输出配置设置为它仍然只连接到相同的20个分区。我们已经查看了节点
有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.