当前位置: 首页 > 知识库问答 >
问题:

Spring cloud stream自定义活页夹未注册。禁用kafka活页夹(如果在@Configuration中使用)

周翰池
2023-03-14

我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册:

活页夹实现:

public class DPSBinder implements Binder<SubscribableChannel, ConsumerProperties, ProducerProperties> {

private DecisionPersistenceServiceClient dpsClient;

private MessageHandler dpsClientConsumerMessageHandler = null;

public DPSBinder(DecisionPersistenceServiceClient dpsClient) {
    this.dpsClient = dpsClient;
}

@Override
public Binding<SubscribableChannel> bindConsumer(String name, String group, SubscribableChannel inboundBindTarget,
        ConsumerProperties consumerProperties) {

    return null;
}

@Override
public Binding<SubscribableChannel> bindProducer(String name, SubscribableChannel outboundBindTarget,
        ProducerProperties producerProperties) {

    switch (name) {

    case "PERSIST_POST":

        this.dpsClientConsumerMessageHandler = message -> dpsClient.persist((DPAPayload) message.getPayload());

        break;

    default:
        this.dpsClientConsumerMessageHandler = null;

    }

    if (this.dpsClientConsumerMessageHandler != null)
        this.subscribe(outboundBindTarget);

    return () -> this.dpsClientConsumerMessageHandler = null;

}

public void subscribe(SubscribableChannel outboundBindTarget) {

    outboundBindTarget.subscribe(this.dpsClientConsumerMessageHandler);
}}

配置类:

@Configuration
public class DPSBinderConfiguration {
@Bean
public DPSBinder dpsBinder(DecisionPersistenceServiceClient dpsClient) {

    return new DPSBinder(dpsClient);
}}

Spring活页夹文件:

dps:something.something.DPSBinderConfiguration

application.yml

application.yml
spring:
 cloud:
  stream:
   bindings:
    input:
      destination: DPP_EVENTS
      group: dpp-local
      binder: kafka
    output:
      destination: PERSIST_POST
      binder: dps
  binders:
    kafka:
      type: kafka
    dps:
      type: dps

我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。

共有1个答案

狄阳秋
2023-03-14

我发现了问题。实际上,在声明绑定bean的地方不应该使用@Configuration。此外,我的活页夹实现中还存在一些逻辑问题,我已经解决了这些问题。

 类似资料:
  • 我试图弄清楚如何在Spring

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?

  • 我们正在使用2.1.3版本的Spring云流kafka流- 应用yml具有属性集- 但是我们仍然得到了Kafka活页夹未知的地位- 感谢对此的任何帮助。