我的Spring云流应用程序中有一个简单的Kafka生成器。当我的Spring应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送给Kafka生产者。
问题是,当对账开始将enet发送到其中时,我的Kafka制作人还没有准备好,导致以下情况:
org . spring framework . messaging . messagedeliveryexception:调度程序没有通道“orderbook-service-1.orderbook”的订户。;嵌套异常为org . spring framework . integration . messagedispatchingexception:调度程序没有订户,failedMessage=GenericMessage..在org . spring framework . integration . channel . abstractsubscribablechannel . dosend(abstractsubscribablechannel . Java:77)在org . spring framework . integration . channel . abstractmessagechannel . send(abstractmessagechannel . Java:445)
是否有一种方法可以在我的应用程序启动期间获得通知,即Kafka通道已初始化,因此我只启动rec作业发布它。
以下是我的代码片段:
public interface OrderEventChannel {
String TOPIC_BINDING = "orderbook";
@Output(TOPIC_BINDING)
SubscribableChannel outboundEvent();
}
@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}
@Service
public class OutgoingOrderKafkaProducer {
@Autowired
private OrderEventChannel orderEventChannel;
public void onOrderEvent( ClientEvent clientEvent ) {
try {
Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
subscribableChannel.send( kafkaMsg );
} catch ( RuntimeException rte ) {
log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
}
}
..
..
}
@PostConstruct
在上下文生命周期中开始使用bean太早了;它们仍在创建、配置和连接在一起。
您可以使用 ApplicationListener
(或 @EventListener
)来侦听 ApplicationReadyEvent
(请务必将偶数的 applicationContext
与主应用程序上下文进行比较,因为您可能会获得其他事件)。
您还可以实现 SmartLifecycle
并将代码放入 start()
中;把你的豆子放在后期阶段
,这样它就可以在一切连接好后启动。
输出绑定在阶段 Integer.MIN_VALUE 1000
启动,输入绑定在阶段 Integer.MAX_VALUE - 1000
启动。
因此,如果您想在消息开始流动之前做一些事情,请在它们之间使用一个阶段(例如,0,这是默认值)。
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
读者在这章可以了解到整个内核初始化的完整周期,从内核解压之后的第一步到内核自身运行的第一个进程。 注意 这里不是所有内核初始化步骤的介绍。这里只有通用的内核内容,不会涉及到中断控制、 ACPI 、以及其它部分。此处没有详述的部分,会在其它章节中描述。 内核解压之后的首要步骤 - 描述内核中的首要步骤。 早期的中断和异常控制 - 描述了早期的中断初始化和早期的缺页处理函数。 在到达内核入口之前最后的
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。
我有以下Java代码: 但是,当我运行它时,它会抛出以下错误: 我在do之前初始化了变量。while 循环,并在 try. 中设置值。捕获循环。似乎尚未设置该变量。抱歉,如果这是一个相当基本的问题,但我似乎无法弄清楚。
版本:Spring Boot: 1.4.2 .发布春云Deps:布里克斯顿。SR7 这是我的申请。处理器应用程序的属性。 当我启动此应用程序时,将按预期创建事件交换,并将其绑定到名为:events exchange的队列。eventconsumers组(也可以)。但routingKey始终为“#”。我已经尝试了从各种文档中找到的所有选项。我在这里遗漏了什么吗? 我希望这个应用程序只订阅某些消息(我