我正在使用Brave库https://github.com/openzipkin/brave进行跟踪,现在我也想将其用于Kafka消费者。我想避免添加Spring Sleuth,并利用Brave Kafka仪器https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients.
对于Kafka消费者,我使用@KafkaListener。代码看起来像这样:
TestKafkaEndpoint。JAVA
@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
和配置类TestKafkanconfig。JAVA
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}
但是我不知道如何使用Kafka工厂或利用Kafka跟踪时使用Kafka消费者。有人有这方面的经验吗?
我不熟悉它,但它看起来像是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
您应该能够创建DefaultKafkanConsumerFactory
的子类;重写createConsumer
方法-侦听器容器使用。。。
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
... 打电话给super。createConsumer(…)并将其包装在一个跟踪消费者
中。
如果您使用的是2.5.3或更高版本,您可以向DKCF添加消费者后置处理器
。
侦探就是这样做的:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285
我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?
如何使用Spring Cloud sleuth跟踪基于Kafka的事件?我看到的例子都是RESTAPI。我在找Kafka客户图书馆。 另外,使用Spring cloud sleuth是一个好主意还是应该手动通过标头传递我的TraceID?
在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka