当前位置: 首页 > 知识库问答 >
问题:

o、 s.k.r.回复KafkatTemplate:回复因以下原因超时:生产记录

慕乐语
2023-03-14

我试图发布一些消息使用Kafka消费者与"回复Kafka模板"。我的主要工作是订阅消息,修改消息,并发回修改后的消息。我已经尝试了增加回复时间。但即使如此,我也没有得到订户的回应。生产者控制台显示如下。

我已尝试增加事务超时、请求超时。但对我来说什么都不管用。任何帮助都将不胜感激。

提前谢谢

这些是我的配置bean:

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
    return properties;
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return properties;
}

 @Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
        KafkaMessageListenerContainer<String, User> container) {

    ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
    replyTemplate.setReplyTimeout(30000);

    return replyTemplate;
}

这是我的消费者:

@KafkaListener(topics = "user",containerFactory="kafkaListenerContainerFactory")
@SendTo
public User listen(User user) throws InterruptedException {
    System.out.println("************* message published *************");
    user.setName("myName");
  return user;
}

警告8088---[TaskScheduler-1]o.s.k.r.ReplyingKafkatTemplate:回复超时,原因是:ProducerRecord(主题=用户,分区=null,标题=记录标题(标题=[RecordHeader(键=kafka_replyTopic,值=[117,115,101,114]),记录标题(键=kafka_correlationId,值=[85,92,37,-119,89,32,77,-1,-75,-107,106,42,68,12,-124,-105]),记录头(key=uuuuuu TypeId,value=[99111109,46107,97102107,97,46109111100101108,4685115101114]),isReadOnly=true),key=null,value=com.kafka.model。User@71178924,timestamp=null),correlationId:[11346283228369987274421922180807230615]

共有1个答案

卫胜
2023-03-14

您可能需要在配置回复KafkatTemplate中添加sharedReplyTopic:

@Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
    KafkaMessageListenerContainer<String, User> container) {
ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyTemplate.setReplyTimeout(30000);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;

}

下面是我的完整配置示例:

@Override
@Bean
public Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("ssl.endpoint.identification.algorithm", ``sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
    new StringDeserializer());
}

@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}

@Override
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer() {
ContainerProperties containerProperties = new ContainerProperties(customerIndexTopic,customerReplyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
 }
 @Override
 @Bean
 public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Override
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());
}

@Override
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
  KafkaMessageListenerContainer<String, String> container) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(),container);
replyingKafkaTemplate.setSharedReplyTopic(true);
replyingKafkaTemplate.setReplyTimeout(10000);
return replyingKafkaTemplate;
}
 类似资料:
  • 问题内容: 我有一个非常简单的,我正在尝试设置自定义错误消息。但是由于某种原因,该错误没有显示出来。 这是我的控制器: 这是我得到的答复: 我正在传递JSON,但我没有验证任何内容,我只是在尝试设置自定义消息。如果更改状态代码,则会在响应中看到它,但是始终为空。 为什么这不按预期工作?这是一个非常简单的示例,我看不到可能缺少的内容。当我调试代码时,我可以看到错误消息设置了所有字段。但是由于某种原因

  • 我有一个非常简单的< code>@RestController,我试图设置一个自定义的错误消息。但是由于某种原因,错误的< code >消息没有显示出来。 这是我的控制器: 这是我得到的回应: 我正在传递一个JSON,但我没有验证任何东西,我只是试图设置自定义消息。如果我更改状态代码,我会在响应中看到,但< code>message始终为空。 为什么这没有像预期的那样工作?这是一个如此简单的示例,

  • 我正在使用mybatis将数据插入到postgresql DB中。我有19629个记录要插入。我正在尝试一次插入所有记录。但是,如果我向查询传递超过6K条记录,我将得到原因:org.PostgreSQL L.util.psqlException:发送到后端时发生I/O错误。 设置参数SQL:insert到temp_overdrive_csv_dtls(LPAT_LIBRARY_CARD_NUMER

  • 我有一个登录配置问题。下面是我的logback.xml: 问题是当我为根记录器设置以下行时:

  • 我最近在Spring Boot应用程序中从使用标准Rabbit模板改为使用异步Rabbit模板。在此过程中,我从标准的方法切换到使用方法。 进行此更改似乎不会影响向RabbitMQ发布消息,但是现在在发送消息时,我确实看到了如下所示的堆栈跟踪: 我确信我只是缺少一个配置选项,但任何帮助都将是徒劳的。 谢谢。:)

  • 我有一个如下的集成,我从rest控制器调用这个方法,但回复超时并没有像我预期的那样工作。 我期望的是:如果在我给出的回复超时时间内没有响应,则返回timeout作为对客户端的响应。 对于通道配置中的超时持续时间,是否需要执行一些操作? 谢谢。