当前位置: 首页 > 知识库问答 >
问题:

为什么我的 Kafka 连接接收器群集只有一个工作线程处理消息?

卢志行
2023-03-14

我最近在我的计算机上安装了一个本地Kafka,用于测试和开发:

  • 3 经纪人
  • 一个输入主题
  • 主题和弹性搜索之间的 Kafka 连接接收器

我设法在独立模式下配置它,所以一切都是本地主机,Kafka connect是使用./connect standalone启动的。sh脚本。

我现在想做的是在分布式模式下运行我的连接器,这样Kafka消息就可以分成两个工作者。我已经启动了两个工作者(仍然是同一台机器上的所有东西),但是当我向我的Kafka主题发送消息时,只有一个工作者(最后启动的)正在处理消息。

所以我的问题是:为什么只有一个工作线程处理 Kafka 消息而不是两个?

当我杀死其中一个工作线程时,另一个工作线程会带回消息流,因此我认为集群设置良好。

我认为:

> < li>

我没有将钥匙放在我的Kafka信息中,这可能与此有关吗?

我在本地主机中运行所有内容,分布式模式可以这样工作吗?(我已经正确配置了特定的唯一字段,例如 ret.port

共有1个答案

扈运浩
2023-03-14

断然的:

来自Kafka文档:

任务之间的工作分工由分配每个任务的分区显示

如果不使用分区(在同一分区中推送所有消息),工作人员将无法拆分消息。您不需要使用消息密钥,只需以循环方式将消息推送到不同的分区即可。

请参阅:https://docs.confluent.io/current/connect/concepts.html#distributed-workers

 类似资料:
  • 我正在浏览Kafka连接,我试图得到一些概念。 假设我有kafka集群(节点k1、k2和k3)设置并且正在运行,现在我想在不同的节点上运行kafka连接工作器,比如分布式模式下的c1和c2。 很少有问题。 1) 要在分布式模式下运行或启动kafka connect,我需要使用命令,这在kaffa集群节点中可用,所以我需要从任何一个kafka集群节点启动kafka连接?或者我启动kafka conn

  • 对于托管在 Confluent Cloud 中的 Kafka 集群,会创建一个审核日志集群。似乎可以将接收器连接器挂接到此群集,并从“汇合审核日志事件”主题中排出事件。 但是,当我运行连接器执行相同操作时,我遇到了以下错误。 在我的connect-distributed.properties文件中,我的设置如下: 需要授予哪些额外的权限,以便连接器可以在集群中创建所需的主题?connect-dis

  • 我使用自己的自定义Sink插件运行Kafka Connect集群(本地有1个工人Docker Compose)。我想在连接器中使用几个主题:topicA、topicB、topicC,每个主题都有一个分区。 我的连接器启动时的配置子集如下: 使用此配置,我希望Kafka Connect为每个接收器任务分配一个主题,但遗憾的是,这不是我看到的。实践中发生的情况是,为分配了所有主题的每个任务调用Sink

  • 在Tomcat 8中设置数据库连接时,由于某些原因,Tomcat没有遵循我在上下文中配置的内容。xml,导致连接耗尽,导致应用程序服务器端的资源冲突(阻塞/等待线程)。在池初始化之后,我总是有8个连接(在mariadb/mysql中显示processlist)。我的配置说明至少10个连接,最多100个连接。 我测试了不同的配置,但这根本没有任何区别,至少很奇怪。上下文。使用xml,否则它将无法连接

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka