我有一个在请求/答复配置中使用的Kafka生产者。当producer的一个实例启动时,它可以完美地工作。然而,当启动生产者的第二个实例时,seconds实例将无法工作。它将正确地将消息写入主题,消费者将处理消息并将回复发送回来,然而生产者将找不到它正在等待的回复消息,它将超时。消息似乎是由生产者的第一个实例接收的。因为第一个实例不期望此答复消息。请求/应答消息失败。是否缺少任何使第二个实例工作的配置?此POC将用于Openshift POD,因此它应该能够扩展到多个生产者和多个消费者实例。下面是我对消费者和生产者的配置。谢谢
Kafka生产者配置
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.topic.request-reply-topic}")
String requestReplyTopic;
@Value("${kafka.request-reply.timeout-ms}")
private Long replyTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.RETRIES_CONFIG, 0);
// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ReplyingKafkaTemplate<String, InGetAccountInfo, AccountInquiryDto> replyKafkaTemplate(ProducerFactory<String, InGetAccountInfo> pf, KafkaMessageListenerContainer<String, AccountInquiryDto> container){
return new ReplyingKafkaTemplate(pf, container);
}
@Bean
public ProducerFactory<String, InGetAccountInfo> requestProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, AccountInquiryDto> replyConsumerFactory() {
JsonDeserializer<AccountInquiryDto> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages(InGetAccountInfo.class.getPackage().getName());
jsonDeserializer.addTrustedPackages(AccountInquiryDto.class.getPackage().getName());
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),jsonDeserializer);
}
@Bean
public KafkaMessageListenerContainer<String, AccountInquiryDto> replyContainer(ConsumerFactory<String, AccountInquiryDto> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public KafkaAsyncService kafkaAsyncService(){
return new KafkaAsyncService();
}
}
public AccountInquiryDto getModelResponse(InGetAccountInfo accountInfo) throws Exception{
LOGGER.info("Received request for request for account " + accountInfo);
// create producer record
ProducerRecord<String, InGetAccountInfo> record = new ProducerRecord<String, InGetAccountInfo>(requestTopic,accountInfo);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post in kafka topic
RequestReplyFuture<String, InGetAccountInfo, AccountInquiryDto> sendAndReceive = kafkaTemplate.sendAndReceive(record);
// confirm if producer produced successfully
SendResult<String, InGetAccountInfo> sendResult = sendAndReceive.getSendFuture().get();
// //print all headers
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
// get consumer record
ConsumerRecord<String, AccountInquiryDto> consumerRecord = sendAndReceive.get();
ObjectMapper mapper = new ObjectMapper();
AccountInquiryDto modelResponse = mapper.convertValue(
consumerRecord.value(),
new TypeReference<AccountInquiryDto>() { });
LOGGER.info("Returning record for " + modelResponse);
return modelResponse;
}
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.topic.acct-info.request}")
private String requestTopic;
@Value("${kafka.topic.request-reply.timeout-ms}")
private Long replyTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ConsumerFactory<String, InGetAccountInfo> requestConsumerFactory() {
JsonDeserializer<InGetAccountInfo> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages(InGetAccountInfo.class.getPackage().getName());
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),jsonDeserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, InGetAccountInfo>> requestReplyListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, InGetAccountInfo> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(requestConsumerFactory());
factory.setConcurrency(3);
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory<String, AccountInquiryDto> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, AccountInquiryDto> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
@Bean
public DepAcctInqConsumerController Controller() {
return new DepAcctInqConsumerController();
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", replyTimeout.toString());
return new NewTopic(requestTopic, 2, (short) 2).configs(configs);
}
}
@KafkaListener(topics = "${kafka.topic.acct-info.request}", containerFactory = "requestReplyListenerContainerFactory")
@SendTo
public Message<?> listenPartition0(InGetAccountInfo accountInfo,
@Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int id) {
try {
LOGGER.info("Received request for partition id = " + id);
LOGGER.info("Received request for accountInfo = " + accountInfo.getAccountNumber());
AccountInquiryDto accountInfoDto = getAccountInquiryDto(accountInfo);
LOGGER.info("Returning accountInfoDto = " + accountInfoDto.toString());
return MessageBuilder.withPayload(accountInfoDto)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.RECEIVED_PARTITION_ID, id)
.build();
} catch (Exception e) {
LOGGER.error(e.toString(),e);
}
return null;
}
我可以通过修改生产者的配置来解决这个问题,添加变量CLIENT_ID_CONFIG如下所示:
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId + "-" + UUID.randomUUID().toString());
props.put(ProducerConfig.RETRIES_CONFIG,"2");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
我使用的是Kafka producer客户端,我的项目中没有任何log4j配置。 在运行时,程序打印了大量的Kafka调试日志,这是我不想要的。
我们使用StreamListeners for Spring Kafka,并使用基于JPA的发件箱发送消息。发件箱是从消费中异步清空的,因此我们不希望“从JPA表读取,向Kafka生成消息”上的事务。 但是我们也有重试主题,因此如果使用失败,我们会将失败的消息移动到重试主题(最终是DLT),这确实需要事务性的。 据我所知,只有在全局基础上(设置事务id前缀),而不是在具体绑定上,才有可能为生产者打
用例如下。我在Java代码中的许多对象实例上传递生产者或消费者引用。在其中一些地方,我想对Kafka的配置进行一些检查。这意味着我想回去,Kafka生产者/消费者(包括默认值)中存储了什么样的有效配置。我在java文档中没有看到显式的anthing: Kafka制作人 那么,如何找回Kafka制作人和消费者的配置呢?
主要内容:1 创建DefaultMQProducer实例,2 start启动生产者,2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance,2.2 registerProducer注册生产者,3 start启动MQClientInstance,3.1 mQClientAPIImpl#start启动netty客户端,3.2 startScheduledTask启动各种定时任务,基于RocketMQ 4.9.3,详细介绍了RocketMQ的客户端P
考虑到以下同步Kafka制作人 请帮助我理解请求之间的区别。暂停。ms和max block。ms producer配置。是否包括所有重试的最长时间?还是每次重试都有自己的超时?
我们在Spark 2.1中使用Kafka0.10,我发现我们的制作人发布消息总是很慢。在给Spark executors提供8个内核后,我只能达到1k/s左右,而另一篇帖子则说它们很容易达到百万/秒。我试着调一下玲珑的曲调。ms和batch。大小来找出答案。然而我发现了玲儿。ms=0对我和这批人来说似乎是最佳选择。大小没有多大影响。我每次迭代发送160k个事件。看来我得让Kafka制作人知道到底发