当前位置: 首页 > 知识库问答 >
问题:

正在从Kafka 0.11.0.1中的_transaction_state主题读取数据

胡泓
2023-03-14

我想读取事务的元数据(在Kafka0.11.0.1中支持),这样我就可以确定特定事务ID的事务是否已经提交。目前,我正在从_transactional_state主题获取键和值,但它是某种编码格式。以下是我在轮询__transaction_state主题时收到的一些相同的键/值:键=10000000MMM,值=+')

共有1个答案

涂羽
2023-03-14

您可以查看kafka/tools/dumplogsegments.scala文件中的TransactionLogMessageParser类的源代码作为示例。它使用TransactionLog类中的ReadTxnRecordValue函数。该函数的第一个参数可以通过同一类的readtxnrecordkey函数检索。

 类似资料:
  • 下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc

  • 我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也

  • 嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写

  • 有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取? 即使主题是在流应用程序已经运行时创建的? 类似于在主题名称中使用通配符,如下所示: 现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。 虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所

  • 我们的生产Storm集群出现了一个我们无法解决的问题。 在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。 我们更改了使用者组 ID,并将输出配置设置为它仍然只连接到相同的20个分区。我们已经查看了节点

  • 有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.