当前位置: 首页 > 知识库问答 >
问题:

Spring集成消息驱动通道适配器不能与Spring-Kafka 2.3+一起工作

蒋寒
2023-03-14

试图调用不存在的方法。尝试是从以下位置进行的:

org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)

以下方法不存在:

org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z

3.如果您使用kafka版本2.5及以上版本,但却被2021-03-22 13:56:05.102-0400 org{local_sparta}WARN[data-pipeline,,,][DP-ACCOUNT][DPA][]annotationConfigServletWebServerApplicationContext:上下文初始化过程中遇到的主要异常-取消刷新尝试:org.springframework.beans.factory.beanCreationException:创建名为嵌套异常是org.springframework.beans.beanInstantiationException:无法实例化[org.springframework.boot.autoconfigure.kafka.kafkaAnnotationDrivenConfiguration]:构造函数引发异常;嵌套异常是org.springframework.beans.factory.unsatisfieddependencyexception:创建类路径资源[org/springframework/boot/autoconfigure/kafka/kafkaautoconfiguration.class]中定义的名为“kafka template”的bean时出错:通过方法“kafka template”参数0表示不满足的依赖关系;嵌套异常是org.springframework.beans.factory.NosuchBeanDefinitionException:没有类型为'org.springframework.kafka.core.producerFactory '的合格bean可用:至少需要1个符合autowire候选的bean。依赖项批注:{}

@Configuration
@Slf4j
public class KafkaChannelConsumer {

    @Autowired
    MessageChannel preRouterLOB;

    @Value("${spring.kafka.bootstrap-servers:localhost9092}")
    private String bootstrapServers;

    @Value("${spring.kafka.topic:55iptest}")
    private String springIntegrationKafkaTopic;

    @Bean
    public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
                kafkaListenerContainer());
        kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
        return kafkaMessageDrivenChannelAdapter;
    }

    @SuppressWarnings("unchecked")
    @Bean
    public ConcurrentMessageListenerContainer kafkaListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);

        return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
                consumerFactory(), containerProps);
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        Map properties = new HashMap();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
        return properties;
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:int="http://www.springframework.org/schema/integration"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">


<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>5.4.5</version>
        </dependency>

这包括Spring-Kafka 2.3.6版本

第3期POM

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>5.4.5</version>
    <exclusions>
       <exclusion>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.7</version>
</dependency>
       

共有1个答案

沈子昂
2023-03-14

5.4.5

这包括Spring-Kafka 2.3.6版本

不,它没有;spring-integration-kafka的5.4.x版本需要2.6.x;该方法被添加到2.5中的属性中。

 类似资料:
  • 我试图将从Quickfix读取消息(读取修复消息)配置到spring集成中。我知道我可以使用入站通道适配器从外部源(如QuickFix)读取数据。您能提供如何编写事件驱动入站通道适配器的示例吗?我有以下配置不起作用

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 我使用了spring io文档中列出的示例配置,它运行良好。 然而,当我用下游应用程序测试它时,我从Kafka那里消费并将其发布到下游。如果下游关闭,则消息仍在使用中,不会重播。 或者说,在使用kafka主题后,如果我在service activator中发现一些异常,我还想抛出一些异常,这些异常应该回滚事务,以便可以重播kafka消息。 简而言之,如果消费应用程序有一些问题,那么我想回滚事务,这

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我使用的是< code>SpringXD,我的配置如下: Spring集成kafka 2.1.0.释放 kafka客户端0.10.0.1 Kafka0.10.x.x Spring-xd-1.3.1.释放 我的 xml 文件中有以下配置: 这是我用来启动/停止频道的Java类: 然后我创建了一个基本流来检查我发送到主题的一些消息是否通过 我检查了创建的文件,它包含我发送到 Kafka 主题的所有消息