当前位置: 首页 > 知识库问答 >
问题:

使用@KafkaListener勇敢地追踪Kafka消费者

洪鸿
2023-03-14

我正在使用Brave库https://github.com/openzipkin/brave进行跟踪,现在我也想将其用于Kafka消费者。我想避免添加Spring Sleuth,并利用Brave Kafka仪器https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients.

对于Kafka消费者,我使用@KafkaListener。代码看起来像这样:

TestKafkaEndpoint。JAVA

@Service
public class TestKafkaEndpoint {

    @KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
    public void procesMyRequest(@Payload final MyRequest request) {
       // do some magic...
    }
}

和配置类TestKafkanconfig。JAVA


@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {

    @Bean
    public ConsumerFactory<String, MyRequest> testConsumerFactory() {
        final Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
        return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(testConsumerFactory());
        factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
        return factory;
    }

但是我不知道如何使用Kafka工厂或利用Kafka跟踪时使用Kafka消费者。有人有这方面的经验吗?

共有1个答案

凌经赋
2023-03-14

我不熟悉它,但它看起来像是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79

您应该能够创建DefaultKafkanConsumerFactory的子类;重写createConsumer方法-侦听器容器使用。。。

this.consumer =
        KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                this.consumerGroupId,
                this.containerProperties.getClientId(),
                KafkaMessageListenerContainer.this.clientIdSuffix,
                consumerProperties);

... 打电话给super。createConsumer(…)并将其包装在一个跟踪消费者中。

如果您使用的是2.5.3或更高版本,您可以向DKCF添加消费者后置处理器

侦探就是这样做的:

https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285

 类似资料:
  • 我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?

  • 如何使用Spring Cloud sleuth跟踪基于Kafka的事件?我看到的例子都是RESTAPI。我在找Kafka客户图书馆。 另外,使用Spring cloud sleuth是一个好主意还是应该手动通过标头传递我的TraceID?

  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka