我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册:
活页夹实现:
public class DPSBinder implements Binder<SubscribableChannel, ConsumerProperties, ProducerProperties> {
private DecisionPersistenceServiceClient dpsClient;
private MessageHandler dpsClientConsumerMessageHandler = null;
public DPSBinder(DecisionPersistenceServiceClient dpsClient) {
this.dpsClient = dpsClient;
}
@Override
public Binding<SubscribableChannel> bindConsumer(String name, String group, SubscribableChannel inboundBindTarget,
ConsumerProperties consumerProperties) {
return null;
}
@Override
public Binding<SubscribableChannel> bindProducer(String name, SubscribableChannel outboundBindTarget,
ProducerProperties producerProperties) {
switch (name) {
case "PERSIST_POST":
this.dpsClientConsumerMessageHandler = message -> dpsClient.persist((DPAPayload) message.getPayload());
break;
default:
this.dpsClientConsumerMessageHandler = null;
}
if (this.dpsClientConsumerMessageHandler != null)
this.subscribe(outboundBindTarget);
return () -> this.dpsClientConsumerMessageHandler = null;
}
public void subscribe(SubscribableChannel outboundBindTarget) {
outboundBindTarget.subscribe(this.dpsClientConsumerMessageHandler);
}}
配置类:
@Configuration
public class DPSBinderConfiguration {
@Bean
public DPSBinder dpsBinder(DecisionPersistenceServiceClient dpsClient) {
return new DPSBinder(dpsClient);
}}
Spring活页夹文件:
dps:something.something.DPSBinderConfiguration
application.yml
application.yml
spring:
cloud:
stream:
bindings:
input:
destination: DPP_EVENTS
group: dpp-local
binder: kafka
output:
destination: PERSIST_POST
binder: dps
binders:
kafka:
type: kafka
dps:
type: dps
我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。
我发现了问题。实际上,在声明绑定bean的地方不应该使用@Configuration。此外,我的活页夹实现中还存在一些逻辑问题,我已经解决了这些问题。
我试图弄清楚如何在Spring
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?
我们正在使用2.1.3版本的Spring云流kafka流- 应用yml具有属性集- 但是我们仍然得到了Kafka活页夹未知的地位- 感谢对此的任何帮助。