我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).
messageKey(m -> ((AcarsFlightInformation) m.getPayload()).getFlightNbr()).topic(acarsKafkaTopic)))
不完全清楚您在问什么。发送到适配器的消息的有效负载成为生产者记录的值。
我想你要问的是,你只想发送部分有效载荷。
在<代码>之前使用标题富集器和转换器。句柄。。。
.enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber")
.transform("payload.message")
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(acarsKafkaTopic))
.get();
适配器将为密钥查找该标头。
问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置
问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更
如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录
使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf
我正在使用Spring集成jms出站通道适配器,它将消息发送到动态队列。我使用属性destination expression=“headers.DestinationQueueName”。在将出站消息写入OUT\U MSG通道之前,在代码中设置DestinationQueueName。 如何在队列上设置这些属性:,和?