当前位置: 首页 > 知识库问答 >
问题:

不使用@Kafkalistener的Spring boot consumer

仰雅昶
2023-03-14

我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行:

@Configuration
    @EnableKafka
    public class KafkaConfig {

      @Value("${kafka.bootstrap-servers}")
      private String bootstrapServers;

      @Bean
      public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "org");
        // automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
      }

      @Bean
      public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      @Bean
      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
          ContainerProperties containerProperties=new ContainerProperties("in.t");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
      }

      @Bean
      public Consumer receiver() {
        return new Consumer();
      }
    }

在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。

另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。

附言:我不想使用@KafkaListener。

共有2个答案

戎永福
2023-03-14

使用Spring Boot和Spring Kafka,可以创建没有@KafkaListener的消费者,并创建额外的显式Kafka bean(ConcurrentMessageListenerContainer等):

  1. 申请。yml-创建连接配置。他们将被注入Kafka的资产
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
    listener:
      type: BATCH
    consumer:
      clientId: "applicationClientId"
      groupId: "applicationGroup"
      keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
      valueDeserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      maxPollRecords: 50
      fetchMinSize: 50
      autoOffsetReset: earliest
      properties:
        spring:
          json:
            trusted:
              packages: "*"
@Configuration
@EnableKafka
@RequiredArgsConstructor //Lombok annotation
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    private final BeanFactory beanFactory;
    private final SomethingApplicationService somethingApplicationService

    @SneakyThrows //Lombok annotation
    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        // The following may be in a loop with different services and topics
        val endpoint = new MethodKafkaListenerEndpoint<String, KafkaReceiptRequest>();

        endpoint.setBeanFactory(beanFactory);
        endpoint.setBean(somethingApplicationService);
        endpoint.setMethod(somethingApplicationService.getClass().getDeclaredMethod("processList", List.class));
        endpoint.setId("Unique id for this endpoint");
        endpoint.setTopics("topic1", ...);
        endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory());

        registrar.registerEndpoint(endpoint);
    }

    @Bean
    public MessageHandlerMethodFactory messageHandlerMethodFactory() {
        return new DefaultMessageHandlerMethodFactory();
    }

}
@Service
public class SomethingApplicationService {

    public void processList(List<Data> list) {
        //processing logic
    }

}

@Data //Lombok annotation
public class Data {
    private String name;
    private String value;
}
裴承安
2023-03-14

>

  • @kafkalistener不适用于Kafka Streams。它适用于普通的消费者。Kafka Stream可以通过StreamsBuilderFactoryBean和特定的@Bean来管理KStream。

    如果您不想使用@kafkalistener,则需要使用手册kafkalistener容器创建。Kafka的KafkaListenerContainerFactory可以用于此目的,但仅限于Spring Kafka2.2之后,而且绝对不能用于Spring Boot。

    因此,您别无选择,除非手动创建一个ConnettMessageListenerContainer。这里已经通过ContainerProperties注入了目标MessageListener。对于您自定义的消费者POJO,您需要考虑将其包装到RecordMessagingMessageListenerAdapter中。并且只有最后一个必须被注入到ConnettMessageListenerContainer中。

    这就是@KafkaListener底层工作原理。

  •  类似资料:
    • 我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversio

    • 我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload

    • 我只想了解@kafkaListener的范围是什么,原型还是单例。在单个主题的多个消费者的情况下,返回的是单个实例还是多个实例。在我的情况下,我有多个客户订阅单个主题并获得报告。我只是想知道如果 > 多个客户希望同时查询报告。在我的例子中,我在成功使用消息后关闭容器,但同时如果其他人想要获取报告,则容器应该打开。 如何将作用域更改为与容器的id相关联的原型(如果不是),以便每次都可以生成单独的实例

    • 我使用的是spring-boot 2.3.2。使用spring-kafka->2.5.4发布。release kafka-clients->2.5.0 我有以下简单的监听器 null 如果我使用 然后它就会失败,不再有异常循环

    • 我正在尝试使用SpringKafka将kafka与我的SpringBoot(v2.0.6版本)应用程序集成。现在我想要一个消费者和一个生产者。我让制作人工作得很好,我可以看到通过控制台消费者发送到主题的消息。我无法使用消费者代码,当Kafka主题中出现新消息时,它不会被调用。 这是我的Kafka配置类: 以下是我的pom依赖项: 以及消费者代码: 我正在我的计算机上运行kafka,正如我所说的——

    • 所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?