当前位置: 首页 > 知识库问答 >
问题:

Kafka制作人关闭后如何重新连接?

魏风华
2023-03-14

我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();

然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。

然后我在一个singleton类中创建了KafkaProducer的单个实例。

现在什么时候

java.lang.IllegalStateException: Cannot send after the producer is closed.

或者我们如何在关闭后重新连接到生产者。问题是如果程序崩溃或有异常,那么?

共有3个答案

怀经赋
2023-03-14

Kafka producer应该是线程安全的,并且对它的线程池很节俭。你可能想用

producer.flush();

代替

producer.close();

让制作人保持开放状态,直到程序终止或您确定不再需要它。

如果仍要关闭生产者,请按需重新创建。

producer = new KafkaProducer<String, byte[]>(prop);
张森
2023-03-14

javadoc for KafkaProducer中所述:

public void close()

Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).

SRC:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

所以你不需要担心你的消息不会被发送,即使你在发送后立即调用close。

如果您打算多次使用KafkaProducer,请仅在使用完毕后将其关闭。如果您仍然希望确保消息在方法完成之前实际发送出去,而不是在缓冲区中等待,那么请使用< code > KafkaProducer # flush(),它将一直阻塞到当前缓冲区发送出去。如果您愿意,也可以在< code>Future#get()上阻塞。

如果你不打算关闭你的KafkaProducer,也需要注意一个警告(例如,在短期应用程序中,你只发送一些数据,应用程序在发送后立即终止)。KafkaProducer IO线程是一个守护进程线程,这意味着JVM不会等到该线程完成后才终止VM。因此,要确保您的消息确实被发送,请使用KafkaProducer#flush(),不要在Future#get。

龙华翰
2023-03-14

通常,在 KafkaProducer 上调用 close() 足以确保所有机上记录都已完成:

/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */

如果在应用程序的整个生命周期中都在使用生产者,那么在得到终止信号之前不要关闭它,然后调用< code>close()。如文档中所述,在多线程环境中使用生成器是安全的,因此您应该重用同一个实例。

如果您在多个线程中共享您的 Kafka 生产者,则有两种选择:

    < li >在通过< code>Runtime.getRuntime()注册关闭回调时调用< code>close()。addShutdownHook来自您的主执行线程 < li >让您的多线程方法竞赛结束,只允许一个方法获胜。

2的粗略草图可能如下所示:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}
 类似资料:
  • 我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例: 代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子): [生产者客户端 Id=生产者-27902] 关闭 Kafka 生产者超时Millis = 10000 毫秒。 [创建者客户端 Id=创建者-27902] 使用超时关闭 Kafka

  • 我正在使用Spring-Cloud-sleuth-stream和Spring-Cloud-starter-stream-kafka发送跨度到kafka,异常发生在连接中。SR1 192.168.1.177是我的localhost 初始化连接 重试例外 我不明白为什么,主机更改为,以及如何修复它

  • 我正在处理xml,我需要每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会拖曳我在某个地方读到过,我可以让制片人敞开心扉。我的问题是:这意味着什么,或者是否有更好的解决方案。 -编辑- 想象以下场景: 我们阅读标签并创建kafka生产者 对于每个元素,我们读取其属性,生成一个json对象并使用se

  • 我正在使用sping-kafka 2.2.7-RELEASE并编写生产者。我正在阅读从这里留档如下所述。 “从2.5版开始,每个版本都扩展了KafkaResourceFactory。这允许在运行时通过向它们的配置添加一个供应商来更改引导服务器:setBootstrapServersSupply ier(() → … ). 这将被调用以获取所有新连接的服务器列表。消费者和生产者通常是长期存在的。要关

  • 我使用weblogic应用服务器和oracle数据库。我使用jdbc与oracle数据库通信。我从weblogic数据源获得连接,并向表中插入一条记录。问题是,当我想关闭连接(插入数据库后)时,我会遇到一个异常(连接已经关闭)。这是我的代码: 但是联系。close语句引发异常: 我试图避免连接。close语句(因为我教过连接是自动关闭的!!但过了一段时间,所有的连接都打开了,因此引发了一个异常)

  • 我使用的代码女巫只允许4个不同端口上的4个连接。此代码正在工作,但当客户端关闭连接时,它无法重新建立连接。连接被拒绝。认为是因为线程关闭。如何解决这个问题?我无法更改端口号... 从套接字导入* BUFF=25 def服务器(主机、端口): 如果名称==“main”:导入线程