当前位置: 首页 > 知识库问答 >
问题:

Spring integration kafka入站适配器聚合传入消息

尤祖鹤
2023-03-14

我有一个int-kafka: outsting-channel-适配器,我使用它向kafka发送消息,然后使用int-kafka: insting-channel-适配器接收消息。通信似乎工作正常,我能够发送和接收消息,但格式有点奇怪。我单独向我的出站适配器发送单个消息,但当我收到消息时,我会收到一条消息,所有消息都聚合到该消息的有效负载中。

这就是我收到消息时消息负载的样子

[有效负载={mytopic={0=[字符串消息1,字符串消息2,字符串消息3,字符串消息4,字符串消息5,…]}},标头={id=3934de02-1f42-ab90-6aa5-9c15f3cd0b6e,时间戳=143920669762}]

接收集成流如下所示

<int-kafka:inbound-channel-adapter
    id="kafkaInboundAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="10" time-unit="MILLISECONDS"
        max-messages-per-poll="5" />
</int-kafka:inbound-channel-adapter>

<int:channel id="inputFromKafka" />

<int:service-activator id="kakfaMessageHandler"
    input-channel="inputFromKafka">
    <bean class="com...broker.MessageHandler"></bean>
</int:service-activator>

为什么我在一条spring集成消息中接收所有消息,而不是在发送给kafka时接收单独的消息。

共有1个答案

郝乐心
2023-03-14

KafkaHighLevelConsumerMessageSource与其他轮询MessageSource一样设计

在这种情况下,我们从Kafka的数据流中得到以下结果:

Message<Map<String, Map<Integer, List<Object>>>>

其中有效负载是Kafka主题s的Map分区s和消息s的映射。

如果您在消费者上下文上只使用一个主题,您可以简单地将顶级Map转换为其分区的映射。或者甚至继续转换到有效负载的列表,如果您只有一个分区在那里。最后您可以使用拆分器

如果您想一个接一个地接收来自主题的消息,并在消息出现时尽快接收,您应该查看

 类似资料:
  • 这就是我的配置 这个想法是每3秒轮询一个目录,并根据通道向调度程序发送3条消息,以允许异步执行。然后根据消息数量聚合消息,然后发送到下一个服务激活器。第一个服务激活器将文件放在源目录中,第二个服务激活器获取聚合列表以将这些文件移动到暂存目录。 似乎发生的情况是,源文件夹跳过了一些文件,但临时文件夹确实获取了所有文件。我的猜测是,轮询器将消息发送到dispatcher通道,但当其线程池变满时,它会忽

  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置

  • 如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更