我正在为一个简单的Spring Boot应用程序编写Kafka联调。该应用程序简单地发布到Kafka主题。
我正在使用一个嵌入式Kafka实例进行测试。当通过Intellij运行时,测试运行得非常好,但当我通过gradle运行时,测试失败。看起来好像锁存
倒计时从未达到0,测试最终超时。
生产者配置:
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrap-address}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> articleProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> articleKafkaTemplate() {
return new KafkaTemplate<>(articleProducerFactory());
}
}
制片人:
public class KafkaProducer {
@Value(value = "kafka.topic-name")
String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) throws KafkaPublishException {
try {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, message);
future.get();
} catch (Exception e) {
throw new KafkaPublishException(e.getMessage());
}
}
public String getTopicName() {
return topicName;
}
消费者:
@Component
public class KafkaConsumerHelper {
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
private void setPayload(String payload) {
this.payload = payload;
}
}
测试:
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {
@Autowired
private KafkaProducer producer;
@Autowired
private KafkaConsumerHelper consumer;
@Value("${test.topic}")
private String topic;
@Test
public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
throws Exception {
String message = createArticle();
producer.sendMessage(message, topic);
consumer.getLatch().await();
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString(message));
}
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
group-id: my-id
test:
topic: embedded-test-topic
partitions-number: 1
replication-factor: 1
知道问题出在哪里吗?
对于将来研究这个问题的人来说,我的问题是我没有正确使用@EmbeddedKafka
。
修复方法是将bootstrapServersProperty="spring.kafka.bootstrap-server"
属性添加到@EmbeddedKafka
注释中。
@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
更多信息请参见Kafka文档。
我正在做一个Kafka的消费者计划。最近我们在PROD环境下进行了部署。在那里,我们面临以下问题: 我的理解是,当组协调器不可用并被重新发现时,心跳间隔(根据文档为3秒)过期,消费者被踢出组。这是正确的吗?。如果是这样的话,应该为这个工作做些什么呢?。如果我错了,请帮助我理解这个问题,并建议您有任何想法,以解决这个问题。如果需要,我可以分享代码。
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何
关于KafkaConsumer(>=0.9),我在尝试实现满足自己需求的解决方案时遇到了一些严重的问题。 假设我有一个函数,它只能从一个Kafka主题中读取n条消息。 例如:-->获取主题中的下5条kafka消息。 所以,我有一个类似这样的循环。用实际正确的参数编辑。在本例中,使用者的max参数设置为1,因此实际循环只循环一次。不同的消费者(他们中的一些人迭代了许多消息)共享一个抽象的父亲(这一个
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。