<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
配置类:
@Slf4j
@EnableKafka
@Configuration
@PropertySource("dv/application.properties")
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER_1.FOO.NET:9094");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
消费阶层:
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "foo_topic", groupId = "group_1")
public void consume(String message) {
log.info(message);
}
}
每当我试图将消息从esb发布到消息代理的主题时,我都会收到此错误 我的代理代码是 我的jndi配置和axis 2配置配置正确。我的MB在端口9444上运行,Publisher_esb在端口9443上运行,subscriber esb在端口9446上运行。当我使我的订阅者保持活动状态时,如果我从我的发布者发布一条消息,该消息会反映到订阅者。 从订户获取消息的代码是 当我第一次运行订阅者时,它给了我结
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
我正在探索一个小Spring Boot,我正在查看一些前端库——我承认我几乎从未见过:) 我可能犯了一些愚蠢的错误,导致引导程序无法工作。我正在尝试设置标题页。因为我讨厌做简单的事情,所以我使用Spring Boot、Webjars和Thymeleaf。这是第一次。但似乎一切都很好,我不明白怎么了。 首先,这里是Gradle依赖项。我使用的是spring 2.0.0。M7。我剪切了应用插件库和组/
我已经用C语言编写了kafka消费者和生产者,使用的库是Library dkafka库。kafka代理版本是kafka_2.12-2.3.0。生产者正在成功生成消息,dr_msg_cb函数确认成功传递。但是,消费者没有收到来自代理的消息。有人能帮助进一步调试吗? 我可以看到,从消费者到代理的TCP连接已经建立。但TCPdump显示代理并没有向消费者发送任何数据。我在消费者代码上启用了调试,下面是消
问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问
我已经在我的sonar实例上测试了规则“正确性--可能的空指针取消引用”和“正确性--在异常路径的方法中可能的空指针取消引用”。不幸的是,以下代码从未被检测为错误 Netbeans能够正确检测到此问题,但sonar 3.6.1不能。 谢谢你的帮助