当前位置: 首页 > 知识库问答 >
问题:

按键连接多个Kafka主题

於意蕴
2023-03-14

如何以可伸缩的方式编写连接多个Kafka主题的使用者?

我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。

理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做?

我知道Kafka Streams将主题连接在一起,这样键就分配给了相同的节点。他们是怎么做到的?附言。我不能使用Kafka流,因为我使用的是Python。

共有1个答案

姬锐
2023-03-14

可惜您使用的是Python--Kafka Streams将是一个完美的匹配:)

如果要手动完成此操作,则需要实现自己的PartitionAssignor--实现必须确保分区在赋值中位于同一位置:假设每个主题有4个分区(我们称它们为A和B),而分区A_0和B_0必须分配给同一个使用者(也是A_1和B_1,...)。

我希望Python consumer允许您通过配置参数partition.assignment.strategy指定自定义分区分配器。

Streams使用了tasks的概念--tasks获得不同主题的分区,并分配了相同的分区号。Streams还试图执行“粘性分配”--即,如果可能的话,在重新平衡的情况下不要移动任务(因此也不要移动分区)。因此,每个使用者都在rebalance元数据中编码其“旧分配”。

基本上,#subscription()方法对每个活动的使用者调用。它将向代理发送消费者的订阅信息(即消费者希望订阅的主题)和可选的元数据。

在第二步中,消费者组的领导者将在#assign()内计算实际的分配。负责代理在重新平衡的第一阶段收集#subscription()提供的所有信息,并将其交给#assign()。这样,领导者就可以得到整个组的全局概览,从而可以确保分区是以一种共同定位的方式分配的。

  • https://cwiki.apache.org/confluence/display/kafka/kafka+streams+architecture
  • http://docs.confluent.io/current/streams/architecture.html
 类似资料:
  • 分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否

  • 错误结果为 “OneToOne关系的JPA问题:外键引用的列数错误。应为2” 如何为join测试表指定主键? 表A:列id与表B:列test_id映射

  • 我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?

  • 我是Kafka连线的新手。我有一个如下的用例: > 有一个共享主题,我在其中收到不同实体的消息,例如员工、部门(实际表名称不同) 员工和部门的模式在模式注册表中注册 使用Kafka接收器连接器,是否可以根据架构分离每个实体的数据并写入相应的表示例,进入主题的员工数据应转到员工表,部门数据应转到部门表 如果没有,还有其他更好的方法吗?

  • 我想我需要为每个StreamListener方法单独的应用程序id,但是如果我正在监听相同的主题,我如何在application.yml文件中配置它呢?

  • 我使用自己的自定义Sink插件运行Kafka Connect集群(本地有1个工人Docker Compose)。我想在连接器中使用几个主题:topicA、topicB、topicC,每个主题都有一个分区。 我的连接器启动时的配置子集如下: 使用此配置,我希望Kafka Connect为每个接收器任务分配一个主题,但遗憾的是,这不是我看到的。实践中发生的情况是,为分配了所有主题的每个任务调用Sink