当前位置: 首页 > 知识库问答 >
问题:

为主题动态创建消息列表

严欣怡
2023-03-14
  1. 主题
  2. GroupID
  3. 分区
  4. 筛选器(如果有)

我能想到的唯一方法是,我们可以在解析xml配置后创建messagelisteners并且每个主题都有自己的concurrentmessagelistenercontainer。

使用spring是否有其他更好的方法可用?

附注:我对Spring和Kafka并不陌生。如果在解释要求时有混淆,请告诉我

谢谢,Rajasekhar

共有1个答案

黄伟
2023-03-14

也许你可以用话题模式。看看消费者属性。例如。听众

@KafkaListener(topicPattern = "topic1|topic2")

将侦听topic1topic2

如果您需要动态创建一个侦听器,则必须格外小心,因为您必须关闭它。

public class DynamicEndpointRegistrar {

    private BeanFactory beanFactory;
    private KafkaListenerContainerFactory<?> containerFactory;
    private KafkaListenerEndpointRegistry endpointRegistry;
    private MessageHandlerMethodFactory messageHandlerMethodFactory;

    public DynamicEndpointRegistrar(BeanFactory beanFactory,
            KafkaListenerContainerFactory<?> containerFactory,
            KafkaListenerEndpointRegistry endpointRegistry, MessageHandlerMethodFactory messageHandlerMethodFactory) {
        this.beanFactory = beanFactory;
        this.containerFactory = containerFactory;
        this.endpointRegistry = endpointRegistry;
        this.messageHandlerMethodFactory = messageHandlerMethodFactory;
    }

    public void registerMethodEndpoint(String endpointId, Object bean, Method method, Properties consumerProperties,
            String... topics) throws Exception {
        KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
        registrar.setBeanFactory(beanFactory);
        registrar.setContainerFactory(containerFactory);
        registrar.setEndpointRegistry(endpointRegistry);
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

        MethodKafkaListenerEndpoint<Integer, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBeanFactory(beanFactory);
        endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

        endpoint.setId(endpointId);
        endpoint.setGroupId(consumerProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
        endpoint.setBean(bean);
        endpoint.setMethod(method);

        endpoint.setConsumerProperties(consumerProperties);
        endpoint.setTopics(topics);

        registrar.registerEndpoint(endpoint);
        registrar.afterPropertiesSet();
    }

}
DynamicEndpointRegistrar dynamicEndpointRegistrar = ...;
MyConsumer myConsumer = ...; // create an instance of your consumer
Properties properties = ...; // consumer properties

// the method that should be invoked
// (the method that's normally annotated with KafkaListener)
Method method = MyConsumer.class.getDeclaredMethod("consume", String.class);

dynamicEndpointRegistrar.registerMethodEndpoint("endpointId", myConsumer, method, properties, "topic");
 类似资料:
  • Atom的界面使用HTML渲染,并且通过Less来定义样式,它是CSS的超集。不要担心之前从未听说过Less,它类似于CSS,但是带有一些便捷的扩展。 Atom支持两种主题:UI和语法。UI主题为树视图、选择夹、下拉列表和状态栏之类的元素定义样式。语法主题为编辑器中的代码定义样式。 主题可以从设置视图安装和修改,你可以选择Atom > Preferences…菜单,然后在左侧的侧栏中选择“Inst

  • 我正在开发消息推送给订阅特定事件的所有用户子集。用户订阅RabbitMQ中的主题,格式为:user-id.event-type.id.我使用Spring WebSockJS、Stomp、RabbitMQ以及前端SockJS和Angular JS。用户应该被通知关于事件的所有行动(评论等,日期更改)。 到目前为止,我们拥有的: 首先,我通过REST webservice终结点进行身份验证,并将令牌放

  • 我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 我用Flink来读写来自不同Kafka主题的数据。具体来说,我使用的是FlinkKafkaConsumer和FlinkkafKapProducer。 我想知道是否有可能根据我程序中的逻辑或记录本身的内容,将我正在阅读和写作的Kafka主题更改为“即时”。 例如,如果读取带有新字段的记录,我希望创建一个新主题,并开始将带有该字段的记录转移到新主题。 谢谢。

  • 我想使用这个扩展:[Quarkus Smallrye响应消息Kafka] 但是在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户那里收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向Kafka发送消息- 或者这些配置应该在运行时以编程方式设置 因为我不认识路,所以我用了本地的Kafka驱动程序