当前位置: 首页 > 知识库问答 >
问题:

使用StreamBridge的Spring Cloud Stream中的生产者回调

汪深
2023-03-14

我想知道一个方法来执行一个回调使用StreamBridge,我想做类似于KafkaTemplate.send返回一个ListenableFuture.

spring cloud stream是否可以使用kafka活页夹发布一些事件,并使用onSuccess和onFailure之类的回调?

示例:<代码>制作人。发送(记录,新回调{…})

共有1个答案

钮瀚
2023-03-14

您可以在生产者绑定上设置同步,发送将在未来完成时在内部等待,也可以配置录制元数据通道以异步获取发送结果。

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-生产者属性

记录元数据通道

成功发送结果应发送到的MessageChannel的bean名称;bean必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(转换后,如果有),带有附加标头KafkaHeaders。RECORD_METADATA。标头包含Kafka客户端提供的RecordMetadata对象;它包括在主题中写入记录的分区和偏移量。

ResultMetadata meta = 
    sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败发送到生产者错误通道(如果配置);请参阅错误通道。

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-error-channels

 类似资料:
  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:

  • 我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 我试图找到一种在以下场景中使用ThreadPoolExecutor的方法: 我有一个单独的线程在线程池中生成和提交任务 为了提供更多的上下文,我目前只需一次提交所有任务,并取消ExecutorService返回的所有未来。在最长生成时间到期后提交。我忽略所有产生的取消异常,因为它们是预期的。问题是未来的行为。cancel(false)很奇怪,不适合我的用例: 它可以防止任何未启动的任务运行(良好)