我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”。;嵌套的异常是org。springframework。整合。MessageDispatchingException:Dispatcher没有订阅服务器。
xml配置如下:ctx kafka producer。xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int:channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="false"
channel="inputToKafka"
topic="customerpos-order"/>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg index="0" ref="producerFactory"/>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<!--<entry key="acks" value="${acks}"/>-->
<!--<entry key="buffer.memory" value="${buffer.memory}"/>-->
<!--<entry key="compression.type" value="${compression.type}"/>-->
<!--<entry key="retries" value="${retries}"/>-->
<!--<entry key="batch.size" value="${batch.size}"/>-->
<!--<entry key="max.block.ms" value="${max.block.ms}"/>-->
<!--<entry key="max.request.size" value="${max.request.size}"/>-->
<!--<entry key="partitioner.class" value="${partitioner.class}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<!--<entry key="security.protocol" value="${security.protocol}"/>-->
<entry key="send.buffer.bytes" value="${send.buffer.bytes}"/>
<!--<entry key="ssl.protocol" value="${ssl.protocol}"/>-->
<!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>-->
<!--<entry key="timeout.ms" value="${timeout.ms}"/>-->
<!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>-->
<!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>-->
<!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
<!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>-->
<!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>-->
<!--<entry key="linger.ms" value="1"/>-->
</map>
</constructor-arg>
</bean>
<!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>-->
</beans>
ctxKafka消费者。xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter id="kafkaListener"
listener-container="container1"
auto-startup="true"
phase="100"
send-timeout="5000"
channel="inputFromChannel"
error-channel="errorChannel"/>
<int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/>
<!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>-->
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>-->
<entry key="group.id" value="customerpos-order"/>
<!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>-->
<!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>-->
<entry key="session.timeout.ms" value="15000"/>
<!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>-->
<!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>-->
<entry key="enable.auto.commit" value="false"/>
<!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>-->
<!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>-->
<!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>-->
<!--<entry key="max.poll.records" value="${max.poll.records}"/>-->
<!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<entry key="auto.commit.interval.ms" value="100"/>
<!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="customerpos-order"/>
</bean>
</constructor-arg>
</bean>
</beans>
使用depend-depen属性在生产者之前初始化消费者。
Java代码如下:
制片人:
package com.angho.cloud.manager.kafka.impl;
import com.angho.cloud.bo.data.order.SyncOrderBO;
import com.angho.cloud.bo.result.order.CPosOrderSyncResult;
import com.angho.cloud.manager.kafka.CPosOrderSyncManager;
import com.angho.data.common.ResultConstant;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
public class CPosOrderKafkaProducer implements CPosOrderSyncManager {
private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class);
@PostConstruct
public void init(){
System.out.println("KafkaProducer start...");
}
@Resource(name = "inputToKafka")
private MessageChannel messageChannel;
@Override
public CPosOrderSyncResult sendToKafka(SyncOrderBO order) {
CPosOrderSyncResult result = new CPosOrderSyncResult();
result.setStatus(ResultConstant.CODE.ERROR);
Message<SyncOrderBO> message = new GenericMessage<>(order);
try {
boolean flag = this.messageChannel.send(message);
if (flag) {
result.setStatus(ResultConstant.CODE.SUCCESS);
result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE);
} else {
result.setMessage("Failed to send message to Kafka.");
}
} catch (Exception ex) {
LOG.error(ex);
result.setException(ex);
}
return result;
}
}
生产者被定义为bean:
<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/>
消费者Java代码:
package com.angho.cloud.manager.kafka;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component("kafkaOrderConsumer")
public class KafkaOrderConsumer {
@PostConstruct
public void init(){
System.out.println("what?KafkaConsumer start...");
}
@SuppressWarnings("unchecked")
@ServiceActivator
public void process(Message<?> message){
System.out.println("Message =======>" + message);
System.out.println("Content =======>" + message.getPayload());
}
}
我不知道发生异常的原因。
我该怎么做才能让它工作?
PS:我试图通过xml而不是@Bean之类的Java代码来配置它。
原谅我糟糕的英语
。。。。。
非常感谢...
自动启动=false
你怎么会有那个?
适配器将不会订阅通道,直到它是start()
ed。
我想使用PublishSubject创建一个广播系统,一个后台任务将轮询一些endpoint,并定期使用该主题广播结果。我希望在第一个订阅者订阅主题时开始轮询,并在没有更多订阅者时停止轮询。如果有新订阅者订阅,则应继续轮询。
我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS
我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?
我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助
来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅
在我的帐户页面/视图订阅中,在操作段落中,我有一个取消按钮,但没有挂起按钮。你知道这是否正常吗? 我跟踪了这个函数,它只返回取消数据:wcs_get_all_user_actions_for_subscription 谢谢你的帮助