当前位置: 首页 > 知识库问答 >
问题:

如何保证debezium生成、存储在kafka并发送到Spark的主题事件顺序?

淳于坚壁
2023-03-14

我使用的debezium带有一个重路由选项,它将所有表的更改发送到仅一个kafka主题。有了这样的配置,我确信我可以从spark中读到独特的Kafka主题。

但我的问题是:如果我使用debezium而不使用重路由选项,并且我在不同的Kafka主题中对每个表进行了更改,我如何保证我以正确的顺序阅读了所有主题的事件?

我知道我可以使用Spark来订购它,例如通过时间戳,但如果说,一个kafka主题离线10分钟,因为一个问题出现了,但其他kafka主题继续工作,我将在Spark一个订购问题。

我该如何面对这个问题呢?

共有1个答案

劳英华
2023-03-14

我在Debezium上用这个配置解决了这个问题

{
"name": "name-connector",
"config": {
    "plugin.name": "pgoutput",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "0.0.0.0",
    "database.port": "5433",
    "database.user": "postgres",
    "database.password": "******",
    "database.dbname" : "database",
    "database.server.name": "database",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.database",
    "decimal.handling.mode" : "string",
    "time.precision.mode" : "connect",
    "tombstones.on.delete" : false,
    "transforms":"routerTopic",
    "transforms.routerTopic.type":"io.debezium.transforms.ByLogicalTableRouter",
    "transforms.routerTopic.topic.regex":"database.public.(.*)",
    "transforms.routerTopic.topic.replacement":"database.public",
}
}

使用Transforms.RouterTopic.topic.Regex和Transforms.RouterTopic.topic.Replacement配置主题路由

https://debezium.io/documentation/reference/0.10/configuration/topic-routing.html

 类似资料:
  • 问题1:表中中的行的事件总是具有相同的键,对吗? 问题2:由于kafka会将具有相同密钥的数据发送到相同分区,所以我可以说的事件可以有序地使用,对吗? 问题3:如果我将主键更改为varchar,那么该键将发生变化,因此分区号可能会发生变化,在这种情况下,我如何保证事件总是有序地消耗?

  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

  • 我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动

  • 是否可以验证/筛选发送到Kafka主题的消息?

  • 我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。

  • 我创建了一个python脚本raw\u tweets\u流。py使用twitter api流式传输twitter数据。twitter上的json数据使用下面的脚本传递给kafka producer。