我有代码:
SqlTemplate
.forQuery(client, "SELECT * FROM user WHERE id=#{id}")
.execute(parameters)
.onSuccess(users -> {
users.forEach(row -> {
// exception here
System.out.println(row.get(UUID.class, "id1") + " " + row.getString("title"));
});
})
处理消费者异常的最佳方法是什么?现在,如果异常引发,它将被吞没。。。
假设希望在引发异常时使流失败,最好使用compose遍历用户并处理。onSuccess()
和。onFailure()
分别显示。
您可以使用CompositeFuture
来实现这一点。您可以有一个Future列表,并将成功的Future/失败的Future(在异常的情况下)添加到foreach
循环中的列表中。
SqlTemplate
.forQuery(client, "SELECT * FROM user WHERE id=#{id}")
.execute(parameters)
.compose(users -> {
List<Future> usersFuture = new ArrayList<>();
users.forEach(row -> {
try {
// exception here
System.out.println(row.get(UUID.class, "id1") + " " + row.getString("title"));
usersFuture.add(Future.succeededFuture());
} catch (Exception e) {
usersFuture.add(Future.failedFuture(e));
}
});
return CompositeFuture.all(usersFuture).mapEmpty();
})
.onSuccess(res -> {
// end the flow with success
})
.onFailure(e -> {
// Add error message and fail the flow
});
一般来说,我对这里可以抛出的异常感到好奇。由于这是您在验证后写入db的数据,因此您应该知道可能的错误场景,并相应地处理它们,而不会使流失败。
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费
我正在使用spring-kafka“2.2.7.RELEASE”来创建一个批处理消费者,并且我正在尝试了解当我的记录处理时间超过 max.poll.interval.ms 时消费者重新平衡是如何工作的。 这是我的配置。 这是我的出厂设置。 我添加了自定义消费者监听器,如下所示。 现在我期望消费者群体能够重新平衡,因为处理时间超过 max.poll.interval.ms 但我没有看到任何这样的行为
我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?