当前位置: 首页 > 知识库问答 >
问题:

为现有消费者服务Spring Boot应用程序创建生产者配置时出现问题

陆信瑞
2023-03-14

我有一个成功消耗来自Kafka主题的消息的Spring引导服务。作为进一步开发的一部分,我需要在之前创建并运行良好的相同消费者服务中将消息发布到另一个Kafka主题。但是当我介绍Kafka生产者配置时,它会给我消费者配置中的错误,说

 WARN  | [main] | org.springframework.context.support.AbstractApplicationContext.refresh | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaConsumerService' defined in file 
 [C:\Workspaces\springboot\test-app\target\classes\com\testapp\execution\carrier\km\consumer\KafkaConsumerService.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [com/testapp/execution/carrier/km/config/KafkaConsumerConfig.class]: 
Unsatisfied dependency expressed through method 'kafkaListenerContainerFactory' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.KafkaTemplate<java.lang.String, com.testapp.execution.carrier.km.model.CarrierStopEBO>' 
available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}


Description:

Parameter 0 of method kafkaListenerContainerFactory in com.testapp.execution.carrier.km.config.KafkaConsumerConfig required a bean of type 'org.springframework.kafka.core.KafkaTemplate' that could not be found.

The following candidates were found but could not be injected:
    - Bean method 'kafkaTemplate' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.KafkaTemplate; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.KafkaTemplate' kafkaTemplatetest


Action:

Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.KafkaTemplate' in your configuration.

我的工作Kafka消费者配置,这是突然抛出问题后,介绍生产者配置

Kafka消费者配置。JAVA

@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.offset-reset-policy}")
    private String offsetResetPolicy;
    @Value("${spring.kafka.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${spring.kafka.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.session-timeout}")
    private Integer sessionTimeout;
    @Value("${spring.kafka.trusted-packages}")
    private String trustedPacakges;

    public KafkaConsumerConfig() {
    }

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetPolicy);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPacakges);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);

        return props;
    }

    @Bean
    public ConsumerFactory<String, CarrierStopEBO> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ErrorHandlingDeserializer2<>(new JsonDeserializer<>(carrierStopEBO.class, false)));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, CarrierStopEBO> kafkaListenerContainerFactory(
            KafkaTemplate<String, CarrierStopEBO> kafkaTemplate1) {
        ConcurrentKafkaListenerContainerFactory<String, CarrierStopEBO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAuthorizationExceptionRetryInterval(Duration.ofMillis(30000));
  

        return factory;
    }


}

将其添加到服务后导致问题的生产者配置

Kafka普罗德里格。JAVA

@Slf4j
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${sni.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${sni.kafka.producer.properties.security.protocol}")
    private String securityProtocol;

    @Value("${sni.kafka.producer.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${sni.kafka.producer.properties.sasl.jaas.config}")
    private String jaasConfig;

    public KafkaProducerConfig() {
   }
    @Bean
    public ProducerFactory<String, CarrierModelDTO> producerFactory() {
        Map<String, Object> configProperties = new HashMap<String, Object>();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
        configProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        return new DefaultKafkaProducerFactory<String, CarrierModelDTO>(configProperties);
    }

    @Bean
    public KafkaTemplate<String, CarrierModelDTO> kafkaTemplatetest() {
        return new KafkaTemplate<String, CarrierModelDTO>(producerFactory());
    }
}

我能够找出消费者和生产者配置与下面的生产者配置代码之间存在一些冲突

 @Bean
    public KafkaTemplate<String, CarrierModelDTO> kafkaTemplatetest() {
        return new KafkaTemplate<String, CarrierModelDTO>(producerFactory());
    }

有人能就这个问题提出建议吗

共有1个答案

刘俊语
2023-03-14

你期待一个KafkaTemplate

 类似资料:
  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我如何将电话限制在每5秒一次。注意:只能修改reallySlowApi。 编辑:我知道,但是如果Api变得更慢,它就不能解决问题。我需要使用的最佳方式。

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 尝试学习线程的多线程和进程间通信。实施了一个典型的生产者-消费者问题。然而,am获得的输出是相当连续的,这在理想情况下不应该是使用线程的情况。 好的,下面是完整的代码: 生产者线程: 使用者线程: 现在,当我运行程序时,生产者线程总是比消费者线程先运行。即使我创建了多个生产者/消费者,结果也是一样的。以下是单个生产者和单个消费者的产量: 有人能解释一下这里的行为吗?我已经在这里读了很多答案,但我想