当前位置: 首页 > 知识库问答 >
问题:

无待回复:消费者记录

章学义
2023-03-14

我正在尝试使用ReplyingKafkaTemplate,偶尔会看到下面的消息。

没有待处理的回复:消费者记录(主题=请求-回复-主题,分区=8,偏移量=1,创建时间=1544653843269,序列化密钥大小=-1,序列化值大小=1609,标题=记录标题(标题=[记录标题(键=kafka_correlationId,值=[-14,65,21,-118,70,-94,72,87,-113,-91,92,72,-124,-110,-64,-94])],isRead唯=假),键=null,带有相关ID:[-18271255759235816475365319231847350110],可能超时,或使用共享回复主题

这将源于以下代码

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

但只是间歇地发生

我还将共享回复主题设置为false,如下所示,并试图强制延长超时时间

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

我的容器如下

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

共有2个答案

郑桐
2023-03-14

已修复在消费者上使用属性@Header (KafkaHeaders)的问题。相关性_ID)

@KafkaListener(topics = "${kafka.topic.model}")
@SendTo("replymodeltopic")
@Override
public Model receive(ConsumerRecord<String, model> record, @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) { 
    record.headers().add(KafkaHeaders.CORRELATION_ID, correlation);
    return record.value();
}

在我的配置中

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.tunnel.group}")
    private String tunnelGroup;

    @Value("${kafka.topic.json.reply}")
    private String jsonTopicReply;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, tunnelGroup);

        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Model> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Model> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(replyTemplate());
        return factory;
    }

    @Bean
    public ProducerFactory<String, Model> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Model> replyTemplate() {
        KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setDefaultTopic(jsonTopicReply);
        return kafkaTemplate;
    }

}
顾乐池
2023-03-14

如果是间歇性的,则很可能回复花了很长时间才到达。信息似乎很清楚

可能超时,或者使用共享回复主题

每个客户端实例都必须使用自己的回复主题或专用分区。

编辑

如果收到的消息的相关 ID 与当前 this.futures 中的条目不匹配(挂起的答复),则会收到日志。这只能在以下情况下发生:

  1. 请求超时(在这种情况下,将有相应的 WARN 日志)。
  2. 模板是 stop()ped(在这种情况下,this.futures 被清除)。
  3. 由于某种原因(不应发生),已处理的回复会重新传递。
  4. 在将密钥添加到 this.futures 之前收到回复(不会发生,因为它是在 send() 处理记录之前插入的)。
  5. 服务器端为同一请求发送 2 个或更多回复。
  6. 其他一些应用程序正在向同一回复主题发送数据。如果您可以使用 DEBUG 日志记录重现它,这将有所帮助,因为这样我们也会在发送时记录相关密钥。
 类似资料:
  • 我观察到,运动流中存在一些记录,但KCL消费者应用程序尚未收到这些记录。发生这种情况的原因是什么?之前和之后的记录都很好。 所有异常都在我的应用程序中消耗,KCL不会收到。所以http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#w1ab1c11c11c15b9的情况也没有发生。 一些细节:打开分片

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我们观察到,其中一位消费者多次试图从Kafka主题中选取事件。我们在消费者应用程序方面有以下内容。spring.kafka.consumer.enable auto commit=false

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka