使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息
通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题
Spring XML配置
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
测试用例成功,在调试时,我发现channel.send()返回true
我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。
bin/kafka控制台用户。sh--引导服务器localhost:9902--主题测试--从头开始
有人能解释为什么我没有看到关于我的测试主题的任何消息吗?
你看过日志了吗?您需要配置键和值序列化程序,否则您将得到
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
使用java时:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
映射键是< code>key.serializer和< code>value.serializer。
问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更
我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。
我如何处理在Spring整合中未能向Kafka传达的信息? 我在“int kafka:outbound channel adapter”中没有看到“error channel”是一个选项,我想知道应该在哪里添加错误通道信息,以便我的ErrorHandler可以获得“failed to kafka”类型的错误。(包括所有类型的故障、配置、网络等) 此外,inputToKafka是排队通道,我应该在哪
我正在使用Spring集成jms出站通道适配器,它将消息发送到动态队列。我使用属性destination expression=“headers.DestinationQueueName”。在将出站消息写入OUT\U MSG通道之前,在代码中设置DestinationQueueName。 如何在队列上设置这些属性:,和?
如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。
问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置