当前位置: 首页 > 知识库问答 >
问题:

Debezium如何设置Kafka消息的时间戳

姜明贤
2023-03-14

我没有找到任何关于Debezium如何设置Kafka消息时间戳的文档,以及是否设置了时间戳。

通过比较这些值,Kafka消息的时间戳总是在数据库表(source.ts_ms)更改的时间戳之后,也在Debezium(ts_ms)处理更改的时间之后。这表明Kafka消息时间戳只是Kafka代理设置为摄入时间的时间戳。

有人知道关于Debezium是否以及如何在其填充的接收器主题中设置Kafka消息时间戳的一些细节吗?

我使用Debezium connector for SQL server来解决这个问题。

共有2个答案

廉宇
2023-03-14

Debezium既不设置Kafka Connect时间戳,也不设置Kafka消息时间戳。前者为空,后者由Kafka Connect管理。

巴星华
2023-03-14

值得检查source.timestamp.mode参数。根据留档(以前喜欢),它的默认值提交应该将ts_ms设置为记录提交到数据库的时间。

 类似资料:
  • 我正在使用Kafka(与雅虎Kafka经理) 我想为重置消息设置一个规则,或者他们如何称呼它:“分区偏移量的总和” 在server.properties上是否有滚动kafka偏移量的参数? (即:我想重置或删除所有影响邮件保留的参数) 谢谢。

  • 我目前使用的是Kafka0.9.0.1。根据我找到的一些来源,设置消息大小的方法是修改中的以下键值。 message.max.bytes replica.fetch.max.bytes fetch.message.max.bytes 我的文件实际上有这些设置。 其他可能相关的设置如下。 但是,当我试图发送具有4到6 MB大小的有效负载的消息时,使用者永远不会得到任何消息。生产者似乎在发送消息时没有

  • 使用Kafka Streams,我们无法确定在处理写入接收器主题的消息后压缩这些消息所需的配置。 另一方面,使用经典的Kafka Producer,可以通过在KafkaProducer属性上设置配置“compression.type”轻松实现压缩 然而,似乎没有任何记录在案的Kafka Streams压缩处理过的消息的例子。 至于这次(2019年初),有没有一种方法可以使用Kafka流进行压缩?

  • 我正在使用Debezium Postgres连接器。我在Postgres中有两个表,分别命名为'publications'和'comments'。根据标准示例,kafka和zookeeper运行在docker容器中。postgres正在本地运行。在使用debezium postgres connect之后,我有以下主题: __consumer_offsets dbserver1.public.co

  • 任何建议或忠告都会真的很有帮助。提前道谢。

  • , 我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些消息。想象一下流应用程序使用这些消息: null 我的代码很简单 目前,它有时会批量聚合,但不确定如何调整它: