当前位置: 首页 > 知识库问答 >
问题:

spring kafka集成动态创建消费者

晁聪
2023-03-14
public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;

public static void main(final String args[]) {
    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default8");

    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default10");

}

public static void addConsumer(String topicId, String groupId) {

    MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);

    ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
    ((SubscribableChannel) inputChannel).subscribe(serviceActivator);

    KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
    try {
        TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);

        ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
        consumerMetadata.setGroupId(groupId);
        consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
        consumerMetadata.setConsumerTimeout("1000");
        consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
        consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));


        ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);

        ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
                zkConnect);

        ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
        MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
        ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);

        kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
    } catch (Exception exp) {
        exp.printStackTrace();
    }
}
<int:channel id="inputFromKafka"/>

<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
        zk-connection-timeout="6000"
        zk-session-timeout="6000"
        zk-sync-time="2000"/>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
        kafka-consumer-context-ref="consumerContext"
        auto-startup="false"
        channel="inputFromKafka">
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>

<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
    <constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>

<int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000"
        zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration group-id="default1"
                value-decoder="kafkaReflectionDecoder"
                key-decoder="kafkaReflectionDecoder"
                max-messages="5000">
            <int-kafka:topic id="mdc1" streams="1"/>
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

当我向主题“Test19”发送任何消息时,配置的ServiceActivator“ProcessMessage”方法将两条消息显示为配置的两个客户,但这里的问题是,在添加到消费者上下文之前,我需要为每个客户加载入站配置文件…否则,我只能在控制台中得到一条消息…是正确的方式还是我需要在这里改变什么?

谢了。

共有1个答案

徐凌
2023-03-14

你想做什么并不清楚,但你做的事情有问题。

通过在订阅使用者之前启动上下文,可能会出现问题(在开始和订阅之间的短时间内,Dispatcher在inputfromkafka上没有订阅者)。

为什么要以编程方式创建服务激活器,而不是在上下文中声明它?

 类似资料:
  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 这部分包含新的 Apache Kafka consumer API. 兼容性 Apache Kafka 版本 0.10+ 写入Kafka 您可以通过创建 org.apache.storm.kafka.bolt.KafkaBolt 实例并将其作为组件附加到您的topology.如果您使用 trident ,您可以通过使用以下对象完成 org.apache.storm.kafka.trident.Tr

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯