当前位置: 首页 > 知识库问答 >
问题:

如果我不关闭Kafka制作人会发生什么

微生德运
2023-03-14

我正在处理xml,我需要每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会拖曳java.lang.IllegalStateException:在生产者关闭后无法发送。我在某个地方读到过,我可以让制片人敞开心扉。我的问题是:这意味着什么,或者是否有更好的解决方案。

-编辑-

<list>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
...
</list>

想象以下场景:

  • 我们阅读标签并创建kafka生产者
  • 对于每个元素,我们读取其属性,生成一个json对象并使用send方法将其发送给kafka-当我们读取元素时,我们在生成器中调用close方法

因此,元素的数量可以是80k的问题,有时当我们调用sresals方法时,它继续以异步方式发送消息。因此,我们需要先调用 flush 方法,但这会影响性能

共有1个答案

姜景焕
2023-03-14

您应该先调用生产者.flush() 然后再调用生产者.close()。这是一个阻止呼叫,不会在发送所有记录之前返回。

如果不调用< code>close(),根据实现/语言的不同,可能会导致资源/内存泄漏

 类似资料:
  • 编辑问题,以包括所需的行为、特定问题或错误,以及重现问题所需的最短代码。这将帮助其他人回答这个问题。 我们知道:load with memory_order_acquire,store with memory_order_release但是,我发现用gcc4.8.2,open -O2时,抛出了一个编译错误,/usr/include/c/4 . 8 . 2/atomic:199:9:error:对于

  • 我无法向Kafka主题发布消息,无法得到Kafka制作人的任何回应,它完全卡住了应用程序 Kafka生产者服务代码 2021-05-30 13:29:13.209[0;39M[32M信息[0;39M[35M2472[0;39M[2M---[0;39M[2M[nio-8084-exec-2][0;39M[36MO.apache.coyote.http11.HTTP11Processor[0;39M[

  • 我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例: 代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子): [生产者客户端 Id=生产者-27902] 关闭 Kafka 生产者超时Millis = 10000 毫秒。 [创建者客户端 Id=创建者-27902] 使用超时关闭 Kafka

  • 我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下: 然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。 然后我在一个singleton类中创建了KafkaProducer的单个实例。 现在什么时候 或者我们如何在关闭后重新连接到生产者。问题是如

  • 问题内容: 如果我将两个不同版本的jar文件放在类路径中,会发生什么? 例如: 保存在classpath中 会发生什么? 问题答案: 尽管我也建议不要这样做,但我仍然想尝试回答您的原始问题: Java具有类加载器层次结构,因此,如果两个JAR都处于层次结构的不同级别,则类加载器将定义其优先级。最受欢迎的示例是Web应用程序类加载器层次结构(例如Tomcat),其中应用程序类的优先级高于容器类的优先

  • 假设我有一个普通的应用程序,其中我正在使用ApplicationContext ApplicationContext=new FileSystemXmlApplicationContext(“bean.xml”)创建一个Spring应用程序上下文 现在,假设在这个bean.xml有Spring bean的bean定义,所以当我创建应用程序上下文时,Spring容器将为这个实例化和初始化一个对象。