我使用的debezium带有一个重路由选项,它将所有表的更改发送到仅一个kafka主题。有了这样的配置,我确信我可以从spark中读到独特的Kafka主题。
但我的问题是:如果我使用debezium而不使用重路由选项,并且我在不同的Kafka主题中对每个表进行了更改,我如何保证我以正确的顺序阅读了所有主题的事件?
我知道我可以使用Spark来订购它,例如通过时间戳,但如果说,一个kafka主题离线10分钟,因为一个问题出现了,但其他kafka主题继续工作,我将在Spark一个订购问题。
我该如何面对这个问题呢?
我在Debezium上用这个配置解决了这个问题
{
"name": "name-connector",
"config": {
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "0.0.0.0",
"database.port": "5433",
"database.user": "postgres",
"database.password": "******",
"database.dbname" : "database",
"database.server.name": "database",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.database",
"decimal.handling.mode" : "string",
"time.precision.mode" : "connect",
"tombstones.on.delete" : false,
"transforms":"routerTopic",
"transforms.routerTopic.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.routerTopic.topic.regex":"database.public.(.*)",
"transforms.routerTopic.topic.replacement":"database.public",
}
}
使用Transforms.RouterTopic.topic.Regex和Transforms.RouterTopic.topic.Replacement配置主题路由
https://debezium.io/documentation/reference/0.10/configuration/topic-routing.html
问题1:表中中的行的事件总是具有相同的键,对吗? 问题2:由于kafka会将具有相同密钥的数据发送到相同分区,所以我可以说的事件可以有序地使用,对吗? 问题3:如果我将主键更改为varchar,那么该键将发生变化,因此分区号可能会发生变化,在这种情况下,我如何保证事件总是有序地消耗?
我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码
我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动
是否可以验证/筛选发送到Kafka主题的消息?
我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。
我创建了一个python脚本raw\u tweets\u流。py使用twitter api流式传输twitter数据。twitter上的json数据使用下面的脚本传递给kafka producer。