当前位置: 首页 > 知识库问答 >
问题:

Kakfa生产者消息传递与acks=all和flush

唐兴思
2023-03-14

使用“acks=all”配置创建Kafka制作人。

用上面的配置调用flush有什么意义吗?

在发送到代理之前,它是否会等待刷新被调用。

作为

acks=all这意味着领导者将等待完整的同步副本集来确认记录。这保证了只要至少有一个同步副本保持活动,记录就不会丢失。这是最强的可用保证。这相当于ack=-1设置。

共有2个答案

端木承业
2023-03-14

在我回答这两个问题之前,让我先试着说出flush()acks之间的区别。

flush()-这是一种在生产者中调用的方法,用于将消息从生产者端维护的缓冲区(可配置)推送到代理。您可以调用此方法或close()将消息从生产者缓冲区发送到代理。如果生产者可用的缓冲区html" target="_blank">内存已满(如Manoj在回答中所述),则会自动调用该函数。

acks=ALL是代理的责任,也就是说,在消息按照生产者要求的设置同步复制到其他代理后,向生产者发送确认。您将使用此设置来调整您的消息传递语义学。在这种情况下,一旦消息复制到指定的同步副本,代理将向生产者发送确认,并说“我收到您的消息了”。

现在,关于你的问题,即是否有任何意义的调用flush与ack设置和生产者是否会等待flush被调用之前被发送到代理。

生产者的异步特性将确保生产者不会等待。但是,如果您显式调用flush(),或者如果它自己被调用,那么任何进一步的发送都将被阻止,直到生产者得到代理的确认。所以,这两者之间的关系非常微妙。

我希望这能有所帮助!

邹海荣
2023-03-14

根据文件

齐平():

调用此方法可立即发送所有缓冲记录(即使linger_ms大于0),并在完成与这些记录相关的请求时阻止发送。flush()的post条件是之前发送的任何记录都已完成(例如Future.is_done()==True)。如果根据生产者的“acks”配置成功确认请求,或者请求导致错误,则认为请求已完成。

当一个线程被阻塞等待刷新调用完成时,其他线程可以继续发送消息;但是,不保证刷新调用开始后发送的消息是否完成。

flush()仍将阻止客户端应用程序,直到发送所有消息,即使ack=0。唯一的一点是,它不会等待ack,该块只在缓冲区被发送出去之前。

flush()with ack=all保证消息已发送,并已在集群上以所需的复制系数进行复制。

最后,回答您的问题:它会在被发送到代理之前等待flush被调用吗?

答:不一定,制作人一直在发信息 <罢工> 以间隔或 通过批处理大小(buffer.memory控制生产者可用存储器的总量以进行缓冲)。但是,flush()总是好的,以确保您发送所有消息。

有关更多信息,请参阅此链接。

 类似资料:
  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我们使用activemq作为Java独立应用程序的消息队列。我的问题是,基于activemq web控制台,队列有一定数量的消息排队和出列。但是,根据我在代码中添加的sysout语句,应用程序消耗的消息数似乎少于activemq web控制台上显示的消息数。例如,在activemq控制台上,没有。排队和出列的消息约为1800条。但是,在控制台上显示的出列消息数(我每接收一条消息就增加一个计数器)只

  • 我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht

  • 由于内容脚本在网页而不是扩展程序的上下文中运行,因此它们通常需要某种与扩展程序其余部分进行通信的方式。例如,RSS 阅读器扩展程序可以使用内容脚本来检测页面上 RSS 摘要的存在,然后通知后台页面以显示该页面的操作图标。 扩展及其内容脚本之间的通信使用消息传递来实现。任何一方都可以监听从另一端发送的消息,并在同一通道上进行响应。消息可以包含任何有效的 JSON 对象(空,布尔值,数字,字符串,数组

  • ms tcp nodelay 描述: 在信差的 TCP 会话上禁用 nagle 算法。 类型: Boolean 是否必需: No 默认值: true ms initial backoff 描述: 出错时重连的初始等待时间。 类型: Double 是否必需: No 默认值: .2 ms max backoff 描述: 出错重连时等待的最大时间。 类型: Double 是否必需: No 默认值: 15