当前位置: 首页 > 知识库问答 >
问题:

为Kafka实现Spring在BoundChannelAdapter中的集成

桂阳文
2023-03-14

我试图在spring集成中实现一个自定义入站通道适配器,以使用来自apache kafka的消息。基于spring集成示例,我发现需要创建一个实现MessageSource接口的类,并实现receive()方法,该方法将返回来自kafka的已使用消息。但根据kafka中的消费者示例,KafkaStream中的消息迭代器由BlockingQueue支持。因此,如果队列中没有消息,线程将被阻塞。

那么,实现接收()方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西要消耗...?

从更一般的意义上讲,我们如何为流式消息源实现一个自定义入站通道,该通道会一直阻塞,直到有东西可以使用。。?

共有1个答案

符修杰
2023-03-14

receive()方法可以阻塞(只要底层操作正确响应中断的线程),并且从入站通道适配器的角度来看,根据底层源的期望,最好使用固定延迟触发器。例如,“长轮询”可以在提供非常小的延迟值时模拟事件驱动的行为。

在我们的JMS轮询MessageSource实现中也有类似的情况。在那里,底层行为由JmsTemboard的接收()方法之一处理。JmsTemboard本身允许配置超时值。这意味着,例如,您可以选择最多阻塞5秒,但在每次阻塞接收调用之间有一个非常短的延迟触发器。或者,您可以指定无限期接收超时。该决定最终取决于底层资源的期望、消息吞吐量等。

另外,我想让您知道我们自己正在探索Kafka适配器。也许您想在sping-集成扩展存储库中就此进行合作?

你好,马克

 类似资料:
  • 我的pom.xml是: 我错过了什么?

  • 在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的

  • 本文向大家介绍Docker部署Kafka以及Spring Kafka实现,包括了Docker部署Kafka以及Spring Kafka实现的使用技巧和注意事项,需要的朋友参考一下 这篇文章主要介绍了Docker部署Kafka以及Spring Kafka实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从https://hub.docker.co

  • 本文向大家介绍集成Spring Redis缓存的实现,包括了集成Spring Redis缓存的实现的使用技巧和注意事项,需要的朋友参考一下 这里的缓存主要是用于 Service 层的,所以下面的配置,都是针对 service 模块的。 本文来自内部分享,对特殊信息进行了简单处理。 本文都是在以缓存来讲 Redis 的使用,实际上 Redis 不仅仅用于缓存,本身还是 NoSQL 数据库,大家可以自

  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka

  • 我正在编写基于int-IP:TCPendpoint的客户机-服务器应用程序。用Kotlin写的代码。应用程序包含两个tcp clienst,当它们首先建立了与服务器的连接并进行了一些初始化时,它们应该按照sertain顺序一个接一个地连接到服务器。 作为这种同步的解决方案,我想使用启动依赖tcp客户端的endpoint组。为此,在depended(第二个)客户机中,我将和属性添加到tcp-outb