正如标题所述,我使用的是debezium Postgres源连接器,我希望MongoDB汇连接器根据名称将kafka主题分组到不同的集合和数据库中(也可以使用不同的dbs来隔离不相关的数据)。在查询时,我在mongo文档中遇到了topic.regex
connector属性。不幸的是,这只为每个成功匹配指定正则表达式的kafka主题在mongo中创建了一个集合,我计划使用同一个mongodb服务器来存放从多个debezium源连接器捕获的许多dbs。你能帮助我吗?
注意:我阅读了这个mongo接收器设置FieldPathNamespaceMapper
,但我不确定它是否符合我的需求,也不确定如何正确配置它。
在 kafka 中,工作线程是可以运行多个连接器的简单容器。对于每个连接器,工作人员根据内部规则和您的配置生成任务。因此,如果您看一下mongodb接收器连接器配置:
https://www . MongoDB . com/docs/Kafka-connector/current/sink-connector/configuration-properties/all-properties/
您可以使用相同的connection.uri、数据库和集合或不同的值创建不同的连接器。因此,您可以使用topics.regex或topics参数对具有自己的connection.uri、数据库和集合的单个连接器的主题进行分组,并同时运行多个连接器。请记住,如果tasks.max
< code>topics.regex是一个通用的接收器连接器,并不是Mongo独有的。
如果我正确理解了这个问题,显然只会在配置的数据库中为实际存在的 Kafka 主题(匹配模式)创建集合并被接收器使用。
如果您想要与模式不匹配的集合,那么您仍然需要使用它们,但需要在将记录写入Mongo之前通过RegexRouter转换显式重命名主题
我正在尝试使用Debezium将上游数据库中的表同步到下游数据库,遵循Debezium博客中描述的方法。 我正在使用: 两个数据库的SQL Server 2019;和 Debezium 1.3(但也曾尝试使用Debezium 1.2,结果相同)。 我在下面列出了我的数据库和连接器设置的全部详细信息。 下游表的DDL为: 特别要注意的是,上游表中的'id'列(它是主键)应该映射到下游表中的'exte
我在Kafka Topic内部有500万条消息。 我必须加入具有相同分区密钥的消息作为单个消息的一部分,并发送给消费者主题[例如:对于密钥1234-Messge1,消费者应该收到单个消息而不是100万消息] Kafka端是否有可用的Kafka API,使用它我可以读取组中具有相同Partition键的所有消息,而不是像传统的spring boot Kafka Listener那样一次读取单个消息。
我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!
这是我的kafka连接器属性 这是我用来创建Elasticsearch水槽的POST主体 我遇到的问题是,有时这个接收器会工作并将数据发送到Elasticsearch并显示 〔2020-09-15 20:27:05904〕INFO WorkerLinkTask{id=test-distributed-connector-0}使用序列号1异步提交偏移。。。。。。。 但大多数时候,它只会卡住并重复这一
我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?
分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否
上下文 我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。 我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中 基本设置工作,并每秒生成一条记录的消息 但是,我想更改主题名称策略。默认设置生成两个主题:
我正在使用融合的 http 接收器连接器从 kafka 主题读取消息并将其发送到endpoint。下面是 http 接收器连接器, 主题中的消息具有以下 json 格式, 因此,使用超文本传输协议接收器连接器,我需要从主题消息中检索“endpointurl”和“凭据”的值,并在同一连接器中使用与“http.api.url”、“connection.user”和“connection.passwor