我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时:
问题:
>
那么回调是否只针对特定的异常调用?
Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送还是定期发送?
Kafka警告图像
注意:我也使用linger.ms设置25秒批量发送我的记录。
public class ProducerDemo {
static KafkaProducer<String, String> producer;
public static void main(String[] args) throws IOException {
final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
producer = new KafkaProducer<String, String>(properties);
String topic = "first_topic";
for (int i = 0; i < 5; i++) {
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//execute everytime a record is successfully sent or exception is thrown
if(e == null){
// No Exception
}else{
//Exception Handling
}
}
});
}
producer.close();
}
总的来说,我注意到很多与KafkaProducer交付语义学和保证相关的问题。它肯定可以更好地记录下来。
还有一件事,自从你提到玲儿。ms:
请注意,及时到达的记录通常会批处理在一起,即使存在延迟。ms=0,因此在重负载下,无论玲珑配置如何,都会进行批处理
那么回调是否只针对特定的异常调用?
是的,这就是它的工作原理。从留档(2.5.0):
* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
请注意重要的部分:当请求完成时,什么意味着生产者必须接受记录并将产品请求发送给Kafka Broker。无需深入挖掘内部,这意味着代理元数据必须存在,分区必须存在。
谈到正式规范,您需要仔细了解send()
的Javadoc,可能还需要了解KafkaProducer对doSend
方法的实现。在那里,您将看到在提交调用时可以抛出多个异常(而不是返回未来并调用回调),例如:
对于第一个问题,这里是答案。根据apache kafka留档,当您实现回调接口时,您可以使用onComplments方法捕获以下异常
https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Callback.html
对于第二个问题,以下属性的组合控制何时发送记录,据我所知,同步或异步调用也是如此。
徘徊马克斯·布洛克女士。太太
https://kafka.apache.org/documentation/#linger.ms
本文向大家介绍深入浅析NodeJs并发异步的回调处理,包括了深入浅析NodeJs并发异步的回调处理的使用技巧和注意事项,需要的朋友参考一下 这里说并发异步,并不准确,应该说连续异步。NodeJs单线程异步的特性,直接导致多个异步同时进行时,无法确定最后的执行结果来回调。举个简单的例子: 连续发起了5次读文件的异步操作,很简单,那么问题来了,我怎么确定所有异步都执行完了呢?因为要在它们都执行完后,才
和响应的侦听器 它不想进入侦听器中的这个句柄消息。和应用程序容器显示消息
本文向大家介绍Nodejs异步回调的优雅处理方法,包括了Nodejs异步回调的优雅处理方法的使用技巧和注意事项,需要的朋友参考一下 前言 Nodejs最大的亮点就在于事件驱动, 非阻塞I/O 模型,这使得Nodejs具有很强的并发处理能力,非常适合编写网络应用。在Nodejs中大部分的I/O操作几乎都是异步的,也就是我们处理I/O的操作结果基本上都需要在回调函数中处理,比如下面的这个读取文件内容的
问题内容: 什么是处理这种情况的最佳方法。我处于受控环境中,所以我不想崩溃。 从setTimeout内抛出时,我们将始终获得: 如果抛出发生在setTimeout之前,那么bluebirds catch将捕获它: 结果是: 很棒-但是如何在节点或浏览器中处理这种性质的恶意异步回调。 问题答案: 承诺不是域,它们不会捕获异步回调中的异常。你就是做不到。 然而诺言来捕捉从内抛出的异常/ / 构造函数的
目标- 到来自(源)MQ队列的消费者消息并发布到 a) 另一个(目标)MQ队列和 b)事务中的Kafka主题,从而避免在MQ或Kafka发布失败的情况下从源MQ中删除消息。 使用的框架 Spring启动版本-2.1.5 Spring JMS-5.1.7 SpringKafka-2.2.6 融合Kafka-5.3 MQ-9 Kafka **应用程序配置类* 实际消费者和发布代码 主Spring靴类
问题内容: 现在,我有几次遇到使用Firebase的同步和异步功能的问题。我的问题通常是我需要在我编写的函数中进行异步Firebase调用。作为一个简单的示例,假设我需要计算并显示对象的速度,而我的Firebase存储距离和时间: 当然,上述代码将无法使用,因为它是异步调用,因此在到达时尚未设置。如果我们将内部放置在回调函数中,则什么也不会返回。 一种解决方案是使我的函数也异步。 另一种解决方案是