在一个Spring Boot应用程序中,我想使用Spring集成从一个Kafka队列中读取。配置了以下内容:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = ... // set properties
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public DirectChannel receiver() {
return new DirectChannel();
}
@Autowired
private Resolver resolver;
@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
return new EventDrivenConsumer(receiver(), resolver);
}
解析器
bean实现MessageHandler
。
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
java.lang.NullPointerException: null
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
调试时,很明显,在RecordMessagingMessageListenerAdapter
(stacktrace的顶部)中,This.MethodHandler
为空。
在Spring集成中,将通道连接到应该处理消息的bean的正确方法是什么?
决议如下:
项目的父级声明如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
对于以下依赖项:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
</dependency>
3.0.1.RELEASE
2.1.0.RELEASE
Boot的优点之一是它可以处理依赖版本。也许应该有spring-boot-integration-kafka
依赖项,它可以防止这个问题。
我想创建一个异步读取kafka消息的流,并使用队列通道累积大量要处理的消息,并且只有在处理完这些消息(例如50条消息)后,它才能处理另外50条消息,或者在释放队列中的空间时。我尝试使用一个从kafka委托读取到另一个流的流,该流具有一个带有PollerMetadata(Pollers.fixedDelay(500))的QueueChannel。maxMessagesPerPoll(50))但是轮询
我的pom.xml是: 我错过了什么?
我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka
在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的
我正在使用带有以下配置的spring集成kafka 1.1.0。我不太了解流的配置。当我增加这个值时,Spring会自动生成更多线程来处理消息吗?e、 g.当我有流=2时,相关的转换器和服务激活器是否都在2个线程中运行?我想错过一些线程执行器配置,但不知道如何配置。任何提示都将不胜感激。谢谢 已尝试使用消息驱动通道适配器,但无法使其工作,以下配置未拾取任何消息。还尝试了组织。springframe
我试图在Spring Integration中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到Kafka队列中。为此,我使用了spring集成kafka。问题是我得到了一个无法解读的错误。 以下是我的XML配置: 当我通过Spring启动运行我的应用程序时,我得到这个异常: 创建名称为org.springframework.integration.kafka.outbound.Kafk