当前位置: 首页 > 知识库问答 >
问题:

Confluent Cloud Apache Kafka消费者主题[test-1]存在/不存在,missingTopicsFatal为真

隆睿
2023-03-14

在Confluent Cloud上使用Kafka时,在ServiceA将消息发布到主题之后,我在我的消费者(ServiceB)上遇到以下错误。但是,当我登录到我的汇流云时,我看到消息已经成功发布到主题。

 org.springframework.context.ApplicationContextException: Failed to start bean 
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is 
 java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and 
 missingTopicsFatal is true 

当我在本地服务器上运行Kafka时,我不会遇到这个问题。ServiceA能够将消息发布到我的本地Kafka服务器上的主题,ServiceB能够成功地使用该消息。

我在application.properties(注释掉的代码)中提到了我的本地Kafka服务器配置

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
public class Sender {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${app.topic}")
private String topic;

public void send(String data){
    Message<String> message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC, topic)
            .build();
    kafkaTemplate.send(message);
  }
}
@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
 }

}
app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

KafkaConsumerConfig.java

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
  @Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory(
            consumerConfigs(),
            new StringDeserializer(), new StringDeserializer());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
 }
}

KafkaConsumer.java

@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);

@Value("{app.topic}")
private String kafkaTopic;

  @KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
  public void receive(@Payload String data) {
    LOG.info("received data='{}'", data);
  }
}

共有1个答案

戚哲
2023-03-14

@Cricket_007的答案是正确的。您需要在sasl.jaas.config属性值中嵌入用户名和密码(特别是集群API密钥和API秘密)。

您可以通过下面的官方示例来仔细检查Java客户端应该如何连接到汇流云:https://github.com/confluentinc/examples/blob/5.3.1-post/clients/Cloud/Java/src/main/Java/io/Confluent/examples/clients/Cloud

谢谢,

 类似资料:
  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。

  • 我已经编写了一个streams应用程序,用于在由5个代理和10个分区组成的集群上与主题对话。我在这里尝试了多种组合,比如10个应用程序实例(在10台不同的机器上),每个实例有1个流线程,5个实例每个实例有2个线程。但由于某种原因,当我签入kafka manager时,分区和流线程之间的1:1映射没有发生。一些线程正在拾取2个分区,而一些线程没有拾取任何分区。你能帮我做同样的事吗??所有线程都是同一

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。