当前位置: 首页 > 知识库问答 >
问题:

Spring云流动态通道

李意致
2023-03-14

我正在使用Spring Cloud Stream,希望以编程方式创建和绑定通道。我的用例是,在应用程序启动期间,我收到要订阅的Kafka主题的动态列表。然后如何为每个主题创建频道?

共有3个答案

任伟
2023-03-14

我有一个任务,我事先不知道主题。我通过一个输入通道来解决这个问题,它可以收听我需要的所有主题。

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html

目的地

绑定中间件上通道的目标目标(例如,RabbitMQ交换或Kafka主题)。如果通道绑定为使用者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的字符串值。如果未设置,则使用通道名称。

所以我的配置

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

然后我定义了一个消费者来处理来自所有这些主题的消息。

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

请注意,在您的情况下,这可能不是一个解决方案。我需要将消息转发到webhooks,这样我就可以进行配置映射。

我还考虑了其他想法。1) 您是没有Spring云的kafka客户端消费者。

2) 创建预定义数量的输入,例如50。

input-1
intput-2
...
intput-50

然后对其中一些输入进行配置。

相关讨论

  • 支持动态路由消息的Spring Cloud stream
  • https://github.com/spring-cloud/spring-cloud-stream/issues/690
  • https://github.com/spring-cloud/spring-cloud-stream/issues/1089

我们使用Spring Cloud 2.1.1版本

冷俊健
2023-03-14

我必须为Camel Spring Cloud Stream组件做类似的事情。也许绑定目的地的消费者代码“真的只是一个指示频道名称的String”对您有用?

在我的例子中,我只绑定一个目的地,但是我不认为多个目的地在概念上有多大不同。

以下是它的要点:

    @Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

有关更多上下文,请参阅此处获取完整的源代码

江德海
2023-03-14

我最近遇到了类似的情况,下面是我动态创建SubscriberChannel的示例。

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);
 类似资料:
  • 我正在开发一个路由器(事件代理)应用程序与Spring云流在Kafka,在功能范式。应用程序从不断输入的主题消耗,映射和过滤消息,然后应该根据一些输入字段将其发送到某个主题(一次只有单个消息,而不是多个结果)。 最好的方法是设置Spring。云流动发送到。输出消息的目标标题?如果是这样,我应该如何为生产者设置绑定?

  • 我开始开发一个spring cloud stream项目。我已通过@Streamlistener注释成功接收到来自Kafka的消息。在将消息发送到任何输出通道之前,我必须通过调用externalservice或DB调用来转换有效负载。我不想从同一个streamlistener方法调用外部服务或DB方法。我的问题是,我们能否在Spring云流中创建内部通道(如Spring集成DSL流)?

  • 我想创建一个公共项目(使用spring cloud stream),根据消息内容动态地将消息路由到不同的(消费者)项目。(rabbitmq作为消息代理) spring cloud stream支持吗?如果没有,有什么建议的方法来实现这一点?thx公司

  • 我想使用bean向Kafka发送带有在运行时解析的主题名称的消息。我的问题是,本主题的中的默认值为1。 当Kafka中已经存在此主题时,以及当客户端创建此主题时(分区计数等于配置值),此值并不反映真正的分区数。 如何使此属性反映分区的真实数目?

  • 我们在Spring Cloud Stream上有一个应用程序,它与Project React集成在一起。我们通过在消息标题中设置spring.cloud.stream.sendto.destination来动态设置目标主题并发布消息。我们正在寻找处理错误的情况下,如Kafka服务器断断续续或主题不可用而发布。我们已经实现了@ServiceActivator来处理所有错误。动态设置主题时,Servi

  • 我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗? 这是供应商的功能bean 和配置