当前位置: 首页 > 知识库问答 >
问题:

春云流Kafka binder初始化时的通知

杨曜瑞
2023-03-14

我的Spring云流应用程序中有一个简单的Kafka生成器。当我的Spring应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送给Kafka生产者。

问题是,当对账开始将enet发送到其中时,我的Kafka制作人还没有准备好,导致以下情况:

org . spring framework . messaging . messagedeliveryexception:调度程序没有通道“orderbook-service-1.orderbook”的订户。;嵌套异常为org . spring framework . integration . messagedispatchingexception:调度程序没有订户,failedMessage=GenericMessage..在org . spring framework . integration . channel . abstractsubscribablechannel . dosend(abstractsubscribablechannel . Java:77)在org . spring framework . integration . channel . abstractmessagechannel . send(abstractmessagechannel . Java:445)

是否有一种方法可以在我的应用程序启动期间获得通知,即Kafka通道已初始化,因此我只启动rec作业发布它。

以下是我的代码片段:

public interface OrderEventChannel {
    String TOPIC_BINDING = "orderbook";
    @Output(TOPIC_BINDING)
    SubscribableChannel outboundEvent();
}

@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}

@Service
public class OutgoingOrderKafkaProducer {

    @Autowired
    private OrderEventChannel orderEventChannel;

   public void onOrderEvent( ClientEvent clientEvent ) {

        try {
            Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
            SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
            subscribableChannel.send( kafkaMsg );
        } catch ( RuntimeException rte ) {
            log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
        }
    }
..
..

}

共有1个答案

左丘智渊
2023-03-14

@PostConstruct在上下文生命周期中开始使用bean太早了;它们仍在创建、配置和连接在一起。

您可以使用 ApplicationListener(或 @EventListener)来侦听 ApplicationReadyEvent(请务必将偶数的 applicationContext 与主应用程序上下文进行比较,因为您可能会获得其他事件)。

您还可以实现 SmartLifecycle 并将代码放入 start() 中;把你的豆子放在后期阶段,这样它就可以在一切连接好后启动。

输出绑定在阶段 Integer.MIN_VALUE 1000 启动,输入绑定在阶段 Integer.MAX_VALUE - 1000 启动。

因此,如果您想在消息开始流动之前做一些事情,请在它们之间使用一个阶段(例如,0,这是默认值)。

 类似资料:
  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 读者在这章可以了解到整个内核初始化的完整周期,从内核解压之后的第一步到内核自身运行的第一个进程。 注意 这里不是所有内核初始化步骤的介绍。这里只有通用的内核内容,不会涉及到中断控制、 ACPI 、以及其它部分。此处没有详述的部分,会在其它章节中描述。 内核解压之后的首要步骤 - 描述内核中的首要步骤。 早期的中断和异常控制 - 描述了早期的中断初始化和早期的缺页处理函数。 在到达内核入口之前最后的

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。

  • 我有以下Java代码: 但是,当我运行它时,它会抛出以下错误: 我在do之前初始化了变量。while 循环,并在 try. 中设置值。捕获循环。似乎尚未设置该变量。抱歉,如果这是一个相当基本的问题,但我似乎无法弄清楚。

  • 版本:Spring Boot: 1.4.2 .发布春云Deps:布里克斯顿。SR7 这是我的申请。处理器应用程序的属性。 当我启动此应用程序时,将按预期创建事件交换,并将其绑定到名为:events exchange的队列。eventconsumers组(也可以)。但routingKey始终为“#”。我已经尝试了从各种文档中找到的所有选项。我在这里遗漏了什么吗? 我希望这个应用程序只订阅某些消息(我