我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。
我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。
一个是要有两个这样的Kafka听众:
@KafkaListener(topics = "topic1")
public void consumeTopic1(String message) throws Exception {
//do something
}
@KafkaListener(topics = "topic2")
public void consumeTopic2(String message) throws Exception {
//do something
}
另一种方法是在同一个kafkaListener中有两个主题,如下所示
@KafkaListener(topics = {"topic1", "topic2"})
public void consumeTopics(String message) throws Exception {
//do something
}
===================edit===============application.yml中的Kafka属性如下所示:
kafka:
properties:
topics:
topic1: topic1
topic2: topic2
bootstrap-servers: server1,server2
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 4
consumer:
group-id: mygroupid
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
>
创建多个主题的目的是在多个
内核/进程/线程之间传播数据,并
随后传播数据的处理。
即使这样,您也可以从多个主题的单个使用者开始,直到在队列中处理事件时观察到延迟。然后你就可以把它们分开。
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。
我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。
null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者