当前位置: 首页 > 知识库问答 >
问题:

Spring集成-Kafka出站通道适配器 发送消息

尉迟德惠
2023-03-14

使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息

通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题

Spring XML配置

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    kafka-template="template"
                                    sync="true"
                                    topic="test">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

JUnit测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath:kafka-outbound-context.xml"
        })
public class ProducerTest{

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

    @Test
    public void test_send_message() {

        channel.send(MessageBuilder.withPayload("Test Message")
                .setHeader(KafkaHeaders.TOPIC, "test").build());

    }

}

测试用例成功,在调试时,我发现channel.send()返回true

我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。

bin/kafka控制台用户。sh--引导服务器localhost:9902--主题测试--从头开始

有人能解释为什么我没有看到关于我的测试主题的任何消息吗?

共有1个答案

景才英
2023-03-14

你看过日志了吗?您需要配置键和值序列化程序,否则您将得到

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

使用java时:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

映射键是< code>key.serializer和< code>value.serializer。

 类似资料:
  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。

  • 我如何处理在Spring整合中未能向Kafka传达的信息? 我在“int kafka:outbound channel adapter”中没有看到“error channel”是一个选项,我想知道应该在哪里添加错误通道信息,以便我的ErrorHandler可以获得“failed to kafka”类型的错误。(包括所有类型的故障、配置、网络等) 此外,inputToKafka是排队通道,我应该在哪

  • 我正在使用Spring集成jms出站通道适配器,它将消息发送到动态队列。我使用属性destination expression=“headers.DestinationQueueName”。在将出站消息写入OUT\U MSG通道之前,在代码中设置DestinationQueueName。 如何在队列上设置这些属性:,和?

  • 如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。

  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置