我正在使用Spring和Spring Kafka编写一个小型PoC。我的目标是让生产者和消费者都从这个主题中写作(例如阅读)。
我有一个奇怪的情况:
下面是我的代码——它与文档示例非常相似。更准确地说,问题来自这样一个事实,即KafkanConsumerConfiguration中的bean不是由Spring创建的(也就是说,构建它们的方法永远不会被调用)。
Kafkaproducer配置。JAVA
@Configuration
public class KafkaProducerConfiguration {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
消息ender.java
@Component
public class MessageSender {
final static private Logger log = Logger.getLogger(MessageSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void onConstruct() throws InterruptedException {
log.info("Sending messages...");
for (int i = 0; i < 100; ++i) {
kafkaTemplate.send("mytopic", "this is a message");
Thread.sleep(1000);
}
kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
log.info("Done sending messages");
}
}
Kafka消费者配置。JAVA
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
return props;
}
}
MyMessageListener。JAVA
@Service
public class MyMessageListener {
final static private Logger log = Logger.getLogger(MyMessageListener.class);
@PostConstruct
public void onConstruct() {
log.info("Message listener started");
}
@KafkaListener(topics = "mytopic")
public void onMessageReceived(String message) {
log.info("Got message: "+ message);
}
}
以下是应用程序生成的日志,供参考:https://pastebin.com/BY783jiL.如您所见,没有创建消费bean(否则将出现一个块ConsumerConfig values:…
)。
以下是我尝试过但没有成功的几件事:
MyMessageListener.onMessageReception
方法上添加注释属性容器工厂="myBeanName"
)Kafka消费者配置
更改为其他内容Kafka消费者配置
中添加一个与kafka无关的@Bean
以查看它是否会被创建(它会)版本:Spring Boot 1.5.9,Spring-Kafka:1.1.7。
我已经撕了几个小时的头发了,非常感谢您的帮助。
谢谢
刚刚发现问题<代码>消息发送者。onConstruct实际上需要花费大量时间来执行(100秒),在此期间,它会阻止Spring创建其他bean。
kafkaTemplate.send("mytopic", "this is a message");
您永远不应该以@PostConstruct
方法开始与外部服务交互——您需要等待应用程序构建完成后再这样做。
实现SmartLifeyle
,为isAutoStartup
返回true
并将该代码移动到start()
。
或者实现ApplicationListener
无论哪种方式都将确保应用程序已准备就绪。
我的问题是关于Kafka在爪哇的消费者 > 已启动Kafka服务器 创建的主题 创作者 创建的消费者 我在终端中做的所有这些事情,工作正常,能够在消费者处正确接收日志。运行下面的消费者(在Java),但没有收到任何记录。它继续汇集在 也没有收到任何记录。 下面给出的我的 java 消费者代码 请告诉我在java消费者类中接收消息,我在配置中做错了什么吗?属性中的“group.id”是怎么回事? 下
我写了一个Kafka消费者从主题中获取所有记录,然后只进入下一步,但它没有获取所有记录。
问题内容: 我有一个dropwizard应用程序,在该应用程序中,我配置了logger附加程序,使其文件如下: 并且,在我的应用中创建了记录器: 在main()中进行一些测试记录: 该应用程序启动并运行没有问题。但是我没有在stdout或mylogs.log文件中得到任何日志(当然,除了Jetty访问日志之外,这些日志也已正确打印到mylogs.log中)。相反,如果我删除configuratio
我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?
我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(
我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。 networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。 消费者: 一个制作人: