当前位置: 首页 > 知识库问答 >
问题:

如何正确创建集成测试的消费者,以验证消息是否已推送到Kafka?

易嘉胜
2023-03-14

我正在使用avro生成一个java类(Heartbeat),我正在使用Spring云消息处理器,以便使用这个Heartbeat类将消息推送到kafka。

以下是我的服务:

@Service
public class HeartbeatServiceImpl implements HeartbeatService {

  private Processor processor;

  public HeartbeatServiceImpl(Processor processor) {
    this.processor = processor;
  }

  @Override
  public boolean sendHeartbeat(Heartbeat heartbeat) {
    Message<Heartbeat> message =
        MessageBuilder.withPayload(heartbeat).setHeader(KafkaHeaders.MESSAGE_KEY, "MY_KEY").build();
    return processor.output().send(message);
  }

}

我的测试包中有一个消费者:

@Component
public class HeartbeatKafkaConsumer {


  private CountDownLatch latch = new CountDownLatch(1);
  private String payload = null;

  @KafkaListener(topics = "HEARTBEAT")
  public void receive(ConsumerRecord<?, ?> consumerRecord) {
    this.payload = consumerRecord.toString();
    this.latch.countDown();
  }


  public CountDownLatch getLatch() {
    return latch;
  }

  public Object getPayload() {
    return payload;
  }

}

现在在我的实际测试课上,我有这个:

public class HeartbeatServiceImplIntegrationTest {

  @Autowired
  private HeartbeatServiceImpl heartbeatService;

  @Autowired
  private HeartbeatKafkaConsumer heartbeatKafkaConsumer;

  @Test
  public void assertHeartbeatPushedToKafka() throws InterruptedException {
    Heartbeat heartbeat =
        Heartbeat.newBuilder().setID("my-Test ID").setINPUTSOURCE("my-Test IS")
            .setMSGID("my-Test 123").setMSGTIME(12345l).setRECEIVEDTIME(12345l).build();

    boolean isMessageSent = heartbeatService.sendHeartbeat(heartbeat);
    assertThat(isMessageSent).isTrue();

    heartbeatKafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

    assertThat(heartbeatKafkaConsumer.getLatch().getCount()).isEqualTo(0L);
    assertThat(heartbeatKafkaConsumer.getPayload()).isEqualTo(heartbeat);

  }

}

我可以通过运行ksql看到消息确实到达kafka。所以消息正如预期的那样在那里。我也收到了来自我的心跳Kafka消费者的消息,但当我做断言时,我收到了这个错误:

Expecting:
 <"ConsumerRecord(topic = HEARTBEAT, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1622134829899, serialized key size = 12, serialized value size = 75, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@765aa560, value = [B@3582e1cd)">
to be equal to:
 <{"ID": "my-Test ID", "TYPE": "null", "MSG_ID": "my-Test 123", "MSG_TIME": 12345, "RECEIVED_TIME": 12345, "INPUT_SOURCE": "my-Test IS", "SBK_FEED_PROVIDER_ID": "null", "SBK_FEED_PROVIDER_NAME": "null"}>

现在,我试图以多种不同的方式阅读我的HeartbeatKafkaConsumer,但我就是无法将值正确解析回心跳。

我如何将Kafka中的内容作为心跳进行消费,以便根据最初发送的消息进行测试?我甚至不能作为字符串检索。

哦,这是我的申请表。Kafka的属性配置:

spring.cloud.stream.default.producer.useNativeEncoding=true
spring.cloud.stream.default.consumer.useNativeEncoding=true
spring.cloud.stream.bindings.input.destination=HEARTBEAT
spring.cloud.stream.bindings.input.content-type=application/*+avro
spring.cloud.stream.bindings.output.destination=HEARTBEAT
spring.cloud.stream.bindings.output.content-type=application/*+avro
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.consumer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.consumer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.specific.avro.reader=true
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myclient
spring.kafka.consumer.auto-offset-reset=earliest

共有1个答案

桓信鸥
2023-03-14

使用消费者Record.value()而不是toString。

它将是一个字节[],您可以将其传递到ObjectMapper中,以将其反序列化为心跳

或者,只需将消费者配置为使用JsonDeserializer消费者记录。value()将是心跳信号。您需要配置反序列化程序来告诉它要创建哪种类型。

第三个(也是最简单的)选项是添加一个JsonMessageConverter@Bean(boot将把它连接到侦听器容器中),并将方法更改为

  @KafkaListener(topics = "HEARTBEAT")
  public void receive(Heartbeat heartbeat) {
      ...
  }

框架根据方法签名告诉转换器要创建什么类型。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-message-conversion

 类似资料:
  • 本文向大家介绍Kafka 消费者是否可以消费指定分区消息?相关面试题,主要包含被问及Kafka 消费者是否可以消费指定分区消息?时的应答技巧和注意事项,需要的朋友参考一下 Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可

  • 我们公司在Go中建立了push服务,为了保证传输速度,我们在四台机器上安装了push服务,当我们需要发送通知时,我们将消息发送给rabbitMQ,然后push服务从队列中获取消息,但有时我们发现只有一台机器获取消息。 我应该如何设置配置以确保每个消费者获得相同数量的消息?

  • 已使用生产者推送消息。它向主题推送了100000条消息。 使用命令:bin/kafka producer perf test。sh--代理列表localhost:9092--消息100000--主题perfAtlasTopic获取以下生产者指标。 开始时间,结束。时间、压缩、消息。大小,批次。大小,总计。数据发送。在里面MB,MB。秒,总计。数据发送。在里面nMsg,nMsg。第[2015-02-

  • 当我向主题“Test19”发送任何消息时,配置的ServiceActivator“ProcessMessage”方法将两条消息显示为配置的两个客户,但这里的问题是,在添加到消费者上下文之前,我需要为每个客户加载入站配置文件…否则,我只能在控制台中得到一条消息…是正确的方式还是我需要在这里改变什么? 谢了。

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认