当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka的Spring:具有异步请求、批处理和最大飞行时间为1的Kafka模板行为

卢黎明
2023-03-14

场景/用例:我有一个Spring Boot应用程序,使用Spring for Kafka向Kafka主题发送消息。完成特定事件(由http请求触发)后,将创建一个新线程(通过Spring@Async),该线程调用kafkatemplate。send()并对其返回的ListenableFuture进行回调。处理http请求的原始线程向调用客户端返回一个响应,并被释放。

正常行为:在正常的应用程序负载下,我已经验证了各个消息都按照需要发布到主题(回调成功或失败时的应用程序日志条目,以及在kafka集群上查看主题中的消息)。如果我在3-5分钟内关闭所有kafka代理,然后将群集恢复在线,应用程序的发布者会立即重新建立与kafka的连接,并继续发布消息。

但是,如果在执行所有群集代理的联机测试时,Kafki尝试将所有的问题恢复到Spring上,则会在5分钟内显示所有群集代理的联机测试失败。这种情况持续了大约7个小时,出版商能够再次成功地与Kafka重新建立沟通(通常会出现管道破裂异常,但并不总是如此)。

使用Kafki和producer执行测试时,使用当前的测试结果:约10分钟连接到producer。制作人在重载的前约30秒内,缓冲区可用字节数继续减少,直到达到0并保持在0。等待线程保持在6到10之间(每次点击刷新时交替),缓冲区可用字节保持在0大约6.5小时。在此之后,缓冲区可用字节显示所有最初分配的已恢复内存,但kafka发布尝试继续失败约30分钟,最终恢复所需的行为。

当前生产者配置

request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000

所有其他属性都使用其默认值

问题:

  1. 鉴于我的用例会改变batch.size或linger.ms在消除重载时遇到的问题方面有任何积极的影响吗?
  2. 假设我有单独的线程,所有调用kafkatemplate.send()的消息和回调都是单独的,我havemax.in.flight.requests.per.connection设置为1,batch.sizelinger.ms忽略每条信息?我的理解是,在这种情况下实际上没有批处理,每个消息都作为单独的请求发送。
  3. 鉴于我max.block.ms设置为10秒,为什么缓冲区内存仍然被使用了这么长时间,为什么所有的消息仍然不能发布这么多小时。我的理解是,10秒后,每个新的发布尝试都应该失败,并返回失败的回调,这反过来又释放了相关的线程

更新:尝试澄清线程的用法。我使用的是JavaDocs中推荐的单一生产者实例。有一些线程,比如https-jsse-nio-22443-exec-*正在处理传入的https请求。当一个请求进入某个处理时,一旦所有与Kafka无关的逻辑完成,就会调用另一个用@Async修饰的类中的方法。此方法调用kafkatemplate。send()。在执行发布到kafka之前,返回到客户端的响应会显示在日志中(这是Im通过单独的线程验证其执行的方式,因为服务在返回响应之前不会等待发布)。有一些任务调度器-*线程似乎正在处理来自kafkatemplate的回调。send()。我的猜测是,Kafka制作人网络的单一线程处理所有的出版。

共有1个答案

汤乐家
2023-03-14

我的应用程序发出http请求,并在每次kafka发布失败时将每条消息发送到数据库平台上的死信表。为了向Kafka发布而旋转的线程被重新用于对数据库的调用。我将数据库调用逻辑移到了另一个类中,并用它自己的@Async和自定义TaskExecutor对其进行了修饰。这样做之后,我已经监控了JConsole,可以看到对Kafka的调用似乎正在重新使用相同的10个线程(TaskExecutor:core Pool size-10、QueueCapacity-0和MaxPoolSize-80),而对数据库服务的调用现在正在使用一个单独的线程池(TaskExecutor:core Pool size-10、QueueCapacity-0和MaxPoolSize-80)它始终关闭和打开新线程,但线程数保持相对恒定。有了这个新的行为缓冲区,可用字节保持在一个健康的恒定水平,一旦代理恢复在线,应用程序的Kafka发行商将成功地重新建立连接。

 类似资料:
  • 在 php-fpm 中有 max_execution_time 这个选项,用来限定请求最大执行时间。 imi 提供了一个中间件,用以支持设置最大请求执行时间,如果超时可以做提前返回结果的处理。 使用方法 启用 在服务器配置 beans 节中配置中间件ExecuteTimeoutMiddleware: [ 'HttpDispatcher' => [ 'middle

  • 我需要执行对一些外部API的循环调用,并有一定的延迟,以防止“超出用户速率限制”限制。 谷歌地图地理编码API对req/sec敏感,允许10 req/sec。我应该为数百个联系人进行地理编码,这样的延迟是必需的。所以,我需要有一个10异步地理编码函数,每个函数延迟1秒。所以,我收集数组中的所有触点,然后以异步方式在数组中循环。 一般来说,我需要有N个同时线程,每个线程的末尾延迟为D毫秒。整个循环遍

  • Spring MVC 3.2开始引入了基于Servlet 3的异步请求处理。相比以前,控制器方法已经不一定需要返回一个值,而是可以返回一个java.util.concurrent.Callable的对象,并通过Spring MVC所管理的线程来产生返回值。与此同时,Servlet容器的主线程则可以退出并释放其资源了,同时也允许容器去处理其他的请求。通过一个TaskExecutor,Spring M

  • 问题内容: 将NodeJS与MongoDB + Mongoose结合使用。 首先,我知道异步非阻塞代码的优点。所以我确实处理回调。但是最后我遇到了以下问题。 可以说我有一个可以随时被用户调用的函数。超级“闪电般的”用户可能几乎同时调用两次。 当然它是这样执行的:查找查询,查找查询,保存查询,保存查询 这完全破坏了应用程序的逻辑(应查找查询,保存查询,查找查询,保存查询)。因此,我决定通过“锁定”特

  • 从浏览器到我的自助主机OWIN WebAPI的所有预飞行请求都不会被中间件处理。如果我从邮递员提出选项请求,它们将被处理。为什么是这样的行为? 请求URL:http://localhost:9000/api/v1/conversations/create?connectionId=13509f44-eacb-4950-8cc8-71bd37098975 请求方法:选项 状态代码:401未经授权的远

  • 我是webflux的新手,无法找到正确的材料来继续实现。 null