我有一个int-kafka: outsting-channel-适配器
,我使用它向kafka发送消息,然后使用int-kafka: insting-channel-适配器
接收消息。通信似乎工作正常,我能够发送和接收消息,但格式有点奇怪。我单独向我的出站适配器发送单个消息,但当我收到消息时,我会收到一条消息,所有消息都聚合到该消息的有效负载中。
这就是我收到消息时消息负载的样子
[有效负载={mytopic={0=[字符串消息1,字符串消息2,字符串消息3,字符串消息4,字符串消息5,…]}},标头={id=3934de02-1f42-ab90-6aa5-9c15f3cd0b6e,时间戳=143920669762}]
接收集成流如下所示
<int-kafka:inbound-channel-adapter
id="kafkaInboundAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="10" time-unit="MILLISECONDS"
max-messages-per-poll="5" />
</int-kafka:inbound-channel-adapter>
<int:channel id="inputFromKafka" />
<int:service-activator id="kakfaMessageHandler"
input-channel="inputFromKafka">
<bean class="com...broker.MessageHandler"></bean>
</int:service-activator>
为什么我在一条spring集成消息中接收所有消息,而不是在发送给kafka时接收单独的消息。
KafkaHighLevelConsumerMessageSource与其他轮询MessageSource一样设计
在这种情况下,我们从Kafka的数据流中得到以下结果:
Message<Map<String, Map<Integer, List<Object>>>>
其中有效负载
是Kafka主题
s的Map
和分区
s和消息
s的映射。
如果您在消费者上下文
上只使用一个主题
,您可以简单地将顶级Map
转换为其分区
的映射。或者甚至继续转换到有效负载
的列表,如果您只有一个分区
在那里。最后您可以使用拆分器
。
如果您想一个接一个地接收来自主题的消息,并在消息出现时尽快接收,您应该查看
这就是我的配置 这个想法是每3秒轮询一个目录,并根据通道向调度程序发送3条消息,以允许异步执行。然后根据消息数量聚合消息,然后发送到下一个服务激活器。第一个服务激活器将文件放在源目录中,第二个服务激活器获取聚合列表以将这些文件移动到暂存目录。 似乎发生的情况是,源文件夹跳过了一些文件,但临时文件夹确实获取了所有文件。我的猜测是,轮询器将消息发送到dispatcher通道,但当其线程池变满时,它会忽
问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置
如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录
问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更