当前位置: 首页 > 知识库问答 >
问题:

Kafka流:使用相同的'application.id'使用多个主题

呼延俊风
2023-03-14

我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
class KafkaSetup() {
    companion html" target="_blank">object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。

问题

  1. 此错误意味着什么,以及导致此错误的原因。
  2. 假设您可以有应用程序的多个实例以相同的id运行以从多个主题分区消费,那么“在Kafka集群中必须是唯一的”是什么意思?
  3. 可以使用相同的Kafka streamsapplication.id启动两个列有不同主题的Kafkastreams吗?如果是,怎么做?

细节:Kafka0.11.0.2

共有1个答案

邓越泽
2023-03-14

Kafaka是通过分区而不是主题来进行缩放的。因此,如果使用相同的application.id启动多个应用程序,它们必须在订阅的输入主题和处理逻辑方面相同。应用程序使用application.id作为group.id来形成使用者组,因此输入主题的不同分区被分配给不同的实例。

如果您有相同逻辑的不同主题,您可以一次订阅所有主题(在您启动的每个实例中)。但是,缩放仍然是基于分区的。(它基本上是输入主题的“合并”。)

如果希望通过主题进行缩放和/或具有不同的处理逻辑,则必须对不同的Kafka Streams应用程序使用不同的application.id

 类似资料:
  • 我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,

  • 配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日

  • 在文件中,我将作为默认值Serde,然后使用使用字符串值。 当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。 以下是发布topicTwo和TopicTrey的例外:

  • 我试图在不同的机器上运行kafka streams应用程序的多个实例,但出现以下错误: 无法锁定全局状态目录。如果多个KafkaStreams实例使用同一状态目录在同一主机上运行,则可能会发生这种情况。kstream-test-prod-6f9bc47d9c-bmq2z:org.apache.kafka.streams.errors。锁定异常:无法锁定全局状态目录:/tmp/kafka-strea

  • 我使用以下代码创建kafka流: 我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。 我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?

  • 假设我有2个Kafka主题登录和注销按用户名分区,并具有相等数量的分区。如果我运行一个由两个消费者组成的消费者组,同时使用两个主题,我是否可以确定每个用户的登录和注销事件将由同一个消费者处理?