我试图加载多个主题到一个单一的@KafkaListener
,但遇到了麻烦,因为我相信它正在寻找一个常量值,但初始化的主题
变量从application.yml
文件引起一些问题,我想知道是否有人可以帮助我解决这个问题,或者为我提供如何将多个Kafka主题加载到单个KafkaListener的方向。
我可以在同一个@KafkaListener
中通过逗号分隔的对象传递多个主题,如下所示:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
我意识到我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。
我认为@KafkaListener需要接受一个常量值,并且我无法将注释定义为常量,有什么方法可以解决这个问题吗?
KafkaWebSocketConnector。java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
应用程序。yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events
@Gary Russell在本期GitHub中提供的答案:
https://github.com/spring-projects/spring-kafka/issues/361
可以使用SpEL表达式;在EnableKafkaIntegrationTests中有一个例子。。。
@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
在我的情况下"#{'${ spring.kafka.topics}'。拆分(',')}"
为了回答上述问题,我能够实现上述代码(由Gary Russell提供)。
我正在构建一个模块,将数据保存到数据库中。完成模块后,我将把它做成一个JAR,这是常见的,任何人都会调用静态方法。有一个参数,它的名称是application name,我不想传递这个值,我想在向任何spring boot应用程序添加jar后动态获取这个值,然后任何人调用这个静态方法检索name application dynamic,所以spring boot包含应用程序属性have value
我想知道一个应用程序的包名,我只知道那个应用程序的应用程序名。假设我想知道一个电子邮件应用程序的包名,只是它的名字,然后如何得到它 我只知道应用程序名。 这是代码,以获得所有的应用程序的包名称,但我需要知道特定的应用程序。
我们在Spring Boot应用程序中使用Kafka Cloud Stream向Kafka发送数据。这样地 我想知道除了直接从 yaml 文件中读取之外,是否可以从消息通道获取主题名称? 主题名称存在于kafka.yaml中
下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?
我试图重命名每个构建变量的APK文件,以包括应用程序名称、版本名、版本代码和构建编号(如果存在)。到目前为止,除了应用程序名之外,我的一切都正常工作。 我想使用与< code>AndroidManifest.xml文件用于android:label相同的值。这来自字符串资源< code>@string/app_name。我看到了通过使用以下内容来替换资源值的能力: 但我只想读取这个值并用它来命名我
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;