我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
class KafkaSetup() {
companion html" target="_blank">object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。
问题
application.id
启动两个列有不同主题的Kafkastreams
吗?如果是,怎么做?细节:Kafka0.11.0.2
Kafaka是通过分区而不是主题来进行缩放的。因此,如果使用相同的application.id
启动多个应用程序,它们必须在订阅的输入主题和处理逻辑方面相同。应用程序使用application.id
作为group.id
来形成使用者组,因此输入主题的不同分区被分配给不同的实例。
如果您有相同逻辑的不同主题,您可以一次订阅所有主题(在您启动的每个实例中)。但是,缩放仍然是基于分区的。(它基本上是输入主题的“合并”。)
如果希望通过主题进行缩放和/或具有不同的处理逻辑,则必须对不同的Kafka Streams应用程序使用不同的application.id
。
我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,
配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日
在文件中,我将作为默认值Serde,然后使用使用字符串值。 当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。 以下是发布topicTwo和TopicTrey的例外:
我试图在不同的机器上运行kafka streams应用程序的多个实例,但出现以下错误: 无法锁定全局状态目录。如果多个KafkaStreams实例使用同一状态目录在同一主机上运行,则可能会发生这种情况。kstream-test-prod-6f9bc47d9c-bmq2z:org.apache.kafka.streams.errors。锁定异常:无法锁定全局状态目录:/tmp/kafka-strea
我使用以下代码创建kafka流: 我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。 我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?
假设我有2个Kafka主题登录和注销按用户名分区,并具有相等数量的分区。如果我运行一个由两个消费者组成的消费者组,同时使用两个主题,我是否可以确定每个用户的登录和注销事件将由同一个消费者处理?