当前位置: 首页 > 知识库问答 >
问题:

Kafka Producer:在带回调的异步发送中处理异常

慕永年
2023-03-14

我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时:

  • Kafka经纪人倒下
  • 主题没有预创建回调没有被调用。相反,我在代码中收到发送不成功的警告(如下所示)。

问题:

>

  • 那么回调是否只针对特定的异常调用?

    Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送还是定期发送?

    Kafka警告图像

    注意:我也使用linger.ms设置25秒批量发送我的记录。

    
    public class ProducerDemo {
    
        static KafkaProducer<String, String> producer;
    
        public static void main(String[] args) throws IOException {
    
             final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
            Properties properties = new Properties();
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
            properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
    
            producer = new KafkaProducer<String, String>(properties);
            String topic = "first_topic";
    
            for (int i = 0; i < 5; i++) {
                String value = "hello world " + Integer.toString(i);
                String key = "id_" + Integer.toString(i);
    
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
    
                  producer.send(record, new Callback() {
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            //execute everytime a record is successfully sent or exception is thrown
                            if(e == null){
                               // No Exception
                            }else{
                                //Exception Handling
                            }
                        }
                    });
            }
            producer.close();
        }
    
  • 共有3个答案

    卓瀚
    2023-03-14

    总的来说,我注意到很多与KafkaProducer交付语义学和保证相关的问题。它肯定可以更好地记录下来。

    还有一件事,自从你提到玲儿。ms:

    请注意,及时到达的记录通常会批处理在一起,即使存在延迟。ms=0,因此在重负载下,无论玲珑配置如何,都会进行批处理

    林华皓
    2023-03-14

    那么回调是否只针对特定的异常调用?

    是的,这就是它的工作原理。从留档(2.5.0):

         * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
         * will be invoked when the request is complete.
    

    请注意重要的部分:当请求完成时,什么意味着生产者必须接受记录并将产品请求发送给Kafka Broker。无需深入挖掘内部,这意味着代理元数据必须存在,分区必须存在。

    谈到正式规范,您需要仔细了解send()的Javadoc,可能还需要了解KafkaProducer对doSend方法的实现。在那里,您将看到在提交调用时可以抛出多个异常(而不是返回未来并调用回调),例如:

    • 如果代理元数据在超时时不可用,
    • 如果数据不能序列化,
    • 如果序列化的表单太大等
    汪阳辉
    2023-03-14

    对于第一个问题,这里是答案。根据apache kafka留档,当您实现回调接口时,您可以使用onComplments方法捕获以下异常

    https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Callback.html

    对于第二个问题,以下属性的组合控制何时发送记录,据我所知,同步或异步调用也是如此。

    徘徊马克斯·布洛克女士。太太

    https://kafka.apache.org/documentation/#linger.ms

     类似资料:
    • 本文向大家介绍深入浅析NodeJs并发异步的回调处理,包括了深入浅析NodeJs并发异步的回调处理的使用技巧和注意事项,需要的朋友参考一下 这里说并发异步,并不准确,应该说连续异步。NodeJs单线程异步的特性,直接导致多个异步同时进行时,无法确定最后的执行结果来回调。举个简单的例子: 连续发起了5次读文件的异步操作,很简单,那么问题来了,我怎么确定所有异步都执行完了呢?因为要在它们都执行完后,才

    • 和响应的侦听器 它不想进入侦听器中的这个句柄消息。和应用程序容器显示消息

    • 本文向大家介绍Nodejs异步回调的优雅处理方法,包括了Nodejs异步回调的优雅处理方法的使用技巧和注意事项,需要的朋友参考一下 前言 Nodejs最大的亮点就在于事件驱动, 非阻塞I/O 模型,这使得Nodejs具有很强的并发处理能力,非常适合编写网络应用。在Nodejs中大部分的I/O操作几乎都是异步的,也就是我们处理I/O的操作结果基本上都需要在回调函数中处理,比如下面的这个读取文件内容的

    • 问题内容: 什么是处理这种情况的最佳方法。我处于受控环境中,所以我不想崩溃。 从setTimeout内抛出时,我们将始终获得: 如果抛出发生在setTimeout之前,那么bluebirds catch将捕获它: 结果是: 很棒-但是如何在节点或浏览器中处理这种性质的恶意异步回调。 问题答案: 承诺不是域,它们不会捕获异步回调中的异常。你就是做不到。 然而诺言来捕捉从内抛出的异常/ / 构造函数的

    • 目标- 到来自(源)MQ队列的消费者消息并发布到 a) 另一个(目标)MQ队列和 b)事务中的Kafka主题,从而避免在MQ或Kafka发布失败的情况下从源MQ中删除消息。 使用的框架 Spring启动版本-2.1.5 Spring JMS-5.1.7 SpringKafka-2.2.6 融合Kafka-5.3 MQ-9 Kafka **应用程序配置类* 实际消费者和发布代码 主Spring靴类

    • 问题内容: 现在,我有几次遇到使用Firebase的同步和异步功能的问题。我的问题通常是我需要在我编写的函数中进行异步Firebase调用。作为一个简单的示例,假设我需要计算并显示对象的速度,而我的Firebase存储距离和时间: 当然,上述代码将无法使用,因为它是异步调用,因此在到达时尚未设置。如果我们将内部放置在回调函数中,则什么也不会返回。 一种解决方案是使我的函数也异步。 另一种解决方案是