当前位置: 首页 > 知识库问答 >
问题:

MessageDeliveryException:Dispatcher没有订阅服务器

韩楷
2023-03-14

我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”。;嵌套的异常是org。springframework。整合。MessageDispatchingException:Dispatcher没有订阅服务器

xml配置如下:ctx kafka producer。xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <int:channel id="inputToKafka"/>

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-template="kafkaTemplate"
                                        auto-startup="false"
                                        channel="inputToKafka"
                                        topic="customerpos-order"/>


    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg index="0" ref="producerFactory"/>
    </bean>

    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <!--<entry key="acks" value="${acks}"/>-->
                <!--<entry key="buffer.memory" value="${buffer.memory}"/>-->
                <!--<entry key="compression.type" value="${compression.type}"/>-->
                <!--<entry key="retries" value="${retries}"/>-->
                <!--<entry key="batch.size" value="${batch.size}"/>-->
                <!--<entry key="max.block.ms" value="${max.block.ms}"/>-->
                <!--<entry key="max.request.size" value="${max.request.size}"/>-->
                <!--<entry key="partitioner.class" value="${partitioner.class}"/>-->
                <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
                <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
                <!--<entry key="security.protocol" value="${security.protocol}"/>-->
                <entry key="send.buffer.bytes" value="${send.buffer.bytes}"/>
                <!--<entry key="ssl.protocol" value="${ssl.protocol}"/>-->
                <!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>-->
                <!--<entry key="timeout.ms" value="${timeout.ms}"/>-->
                <!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>-->
                <!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>-->
                <!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>-->
                <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
                <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
                <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
                <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
                <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
                <!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>-->
                <!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>-->
                <!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>-->
                <!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>-->
                <!--<entry key="linger.ms" value="1"/>-->
            </map>
        </constructor-arg>
    </bean>

    <!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>-->
</beans>

ctxKafka消费者。xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <int-kafka:message-driven-channel-adapter id="kafkaListener"
                                              listener-container="container1"
                                              auto-startup="true"
                                              phase="100"
                                              send-timeout="5000"
                                              channel="inputFromChannel"
                                              error-channel="errorChannel"/>

    <int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/>

    <!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>-->

    <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                        <!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>-->
                        <entry key="group.id" value="customerpos-order"/>
                        <!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>-->
                        <!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>-->
                        <entry key="session.timeout.ms" value="15000"/>
                        <!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>-->
                        <!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>-->
                        <entry key="enable.auto.commit" value="false"/>
                        <!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>-->
                        <!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>-->
                        <!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>-->
                        <!--<entry key="max.poll.records" value="${max.poll.records}"/>-->
                        <!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>-->
                        <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
                        <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
                        <entry key="auto.commit.interval.ms" value="100"/>
                        <!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>-->
                        <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
                        <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
                        <!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>-->
                        <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
                        <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
                        <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg name="topics" value="customerpos-order"/>
            </bean>
        </constructor-arg>
    </bean>

</beans> 

使用depend-depen属性在生产者之前初始化消费者。

Java代码如下:

制片人:

package com.angho.cloud.manager.kafka.impl;

import com.angho.cloud.bo.data.order.SyncOrderBO;
import com.angho.cloud.bo.result.order.CPosOrderSyncResult;
import com.angho.cloud.manager.kafka.CPosOrderSyncManager;
import com.angho.data.common.ResultConstant;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;


import javax.annotation.PostConstruct;
import javax.annotation.Resource;

public class CPosOrderKafkaProducer implements CPosOrderSyncManager {

    private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class);

    @PostConstruct
    public void init(){
        System.out.println("KafkaProducer start...");
    }

    @Resource(name = "inputToKafka")
    private MessageChannel messageChannel;

    @Override
    public CPosOrderSyncResult sendToKafka(SyncOrderBO order) {
        CPosOrderSyncResult result = new CPosOrderSyncResult();
        result.setStatus(ResultConstant.CODE.ERROR);

        Message<SyncOrderBO> message = new GenericMessage<>(order);

        try {
            boolean flag = this.messageChannel.send(message);
            if (flag) {
                result.setStatus(ResultConstant.CODE.SUCCESS);
                result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE);
            } else {
                result.setMessage("Failed to send message to Kafka.");
            }
        } catch (Exception ex) {
            LOG.error(ex);
            result.setException(ex);
        }

        return result;
    }
}

生产者被定义为bean:

<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/>

消费者Java代码:

package com.angho.cloud.manager.kafka;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component("kafkaOrderConsumer")
public class KafkaOrderConsumer {

    @PostConstruct
    public void init(){
        System.out.println("what?KafkaConsumer start...");
    }

    @SuppressWarnings("unchecked")
    @ServiceActivator
    public void process(Message<?> message){
        System.out.println("Message =======>" + message);
        System.out.println("Content =======>" + message.getPayload());
    }
}

我不知道发生异常的原因。

我该怎么做才能让它工作?

PS:我试图通过xml而不是@Bean之类的Java代码来配置它。

原谅我糟糕的英语
。。。。。

非常感谢...

共有1个答案

朱翔
2023-03-14

自动启动=false

你怎么会有那个?

适配器将不会订阅通道,直到它是start()ed。

 类似资料:
  • 我想使用PublishSubject创建一个广播系统,一个后台任务将轮询一些endpoint,并定期使用该主题广播结果。我希望在第一个订阅者订阅主题时开始轮询,并在没有更多订阅者时停止轮询。如果有新订阅者订阅,则应继续轮询。

  • 我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS

  • 我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?

  • 我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助

  • 来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅

  • 在我的帐户页面/视图订阅中,在操作段落中,我有一个取消按钮,但没有挂起按钮。你知道这是否正常吗? 我跟踪了这个函数,它只返回取消数据:wcs_get_all_user_actions_for_subscription 谢谢你的帮助