我已经遵循下面的文档,我有一个生产者和消费者与Kinesis流工作得非常好。我想了解如何处理生产者(源)和消费者(处理器)的错误,以防任何异常发生。
根据Spring Stream错误处理文档,我尝试了以下方法:
catch (Exception e) {
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
else {
throw new MessagingException(requestMessage,
"Exception thrown while invoking "
+ this.invocableHandlerMethod.getShortLogMessage(),
e);
}
}
我该如何处理和解决这个问题?请建议和帮助我。
文档:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#error-channels
只是为了所有面临这个问题的人的利益。如果你错过了评论区。Deepak Chaudhary的答案对消费者来说很有效。
请查看stackoverflow.com/questions/63270755/…,如果有帮助的话。谢谢你。--迪帕克·乔德里
谢谢。迪帕克
想了解我们如何才能为制片人做到这一点。
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我有一个简单的流处理器(不是消费者/生产者),看起来像这样(Kotlin) 这些类、和与处理器在同一个包中。下面是我的配置文件:
我想使用一个camel组件,它提供了使用和生成RESTful资源的能力。 对于这个例子,我想使用camel restlet组件。restlet组件一切正常,我已经使用REST DSL成功地实现了restlet consumer。然而,我有几个问题: 问题 1) 将restlet启用为异步是否安全?我读过restlet async可能会导致一些问题。这仍然正确吗?如何提高服务绩效?我应该改用码头吗?
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?