当前位置: 首页 > 知识库问答 >
问题:

使用带有自定义头的Kafka消息

锺离霖
2023-03-14

我试图用Spring Cloud Stream创建一个kafka使用者,以便监听在任何Spring上下文之外构建的kafka消息,并使用自定义头(operationType)。

我使用的是Spring Boot 1.5.x/Spring Cloud egdware.sr5和1.1.1版本的kafka-client和Kafka2.11。

我的侦听器类包含此方法

@StreamListener(value = "dataset-changed", condition = "headers['operationType']=='UPDATE'")
public void onEvent(@Payload DatasetChangedMessage payload) {
  // my code should be execute only if the header operationType == UPDATE
}
spring.cloud.stream:
  bindings:
    dataset-changed:
      group: preparation
      content-type: application/json
      destination: dataset-changed
      consumer:
        headerMode: raw
        configuration:
          key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<byte[], String> producer = new KafkaProducer<>(producerConfig);

// Headers (to condition the kafka listener)
final List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("operationType", "UPDATE".getBytes()));

ProducerRecord<byte[], String> record =
        new ProducerRecord<>("dataset-changed", 0, "111".getBytes(), getJsonPayload(), headers);
Future<RecordMetadata> future = producer.send(record);
future.get();

producer.close();
2019-03-15 14:48:32.103  WARN [tdp-preparation,1e24b9764ef9bb14,1e24b9764ef9bb14,false] 34760 --- [           -L-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: ea27a446-69da-7b8d-1b94-50b46a40dfde

而operationType标头是存在的

共有1个答案

丌官开宇
2023-03-14

我认为您在使用者端丢失了header,因为您使用了headermode:raw。这实际上意味着none-map no header。

请考虑使用headermode:headers

有关更多信息,请参见文档:https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#_consumer_properties

 类似资料:
  • 我目前正在使用Spring Kafka来使用topic中的消息以及Spring的@Retry。因此,基本上,我正在尝试处理消费者消息以防出现错误。但在这样做时,我希望避免KafkaMessageListenerContainer抛出的异常消息。相反,我想显示一条自定义消息。我尝试在ConcurrentKafkAlisterContainerFactory中添加错误处理程序,但这样做时,我的重试没有

  • 我正在使用Spring Boot 2.3.0和Spring Kafka 2.5.0,在我的KafkaListener中,我试图将MessageHeaders映射到一个自定义类。下面的代码可以工作,但给出了byte[]格式的头,然后我必须将其转换为侦听器内部的类(并对每个侦听器重复此操作),这是我希望避免的。 当我将代码更改为: 我查看了https://docs.spring.io/spring-k

  • 我是springboot kafka的新手,我在这篇文章后面创建了一个例子。 https://www.codenotfound.com/spring-kafka-boot-example.html 我发现您可以将kafka元数据设置为标题,但这不符合我的目的。 我能做到这一点吗?如果可能的话,我很感激你能分享一个例子。

  • 我需要发送一个GET请求,标题是:。我期待服务器的XML响应。我用邮递员测试了请求和响应,一切都很好。但是,当我尝试在Spring中使用执行此操作时,我总是会收到400个错误请求。的异常是: 我的代码: 调试消息将标头显示为,这似乎是正确的。我想知道我的请求出了什么问题,以及是否有一种方法可以看到在线调试的请求。

  • 我使用方法处理消息。 我已经找到了这个任务,它仍然是打开的。https://issues.apache.org/jira/browse/kafka-5632?src=confmacro

  • 我知道https://swagger.io/docs/specification/authentication/bearer-authentication/,但它似乎并没有完全帮助我。 This(OpenAPI 3) 名为default的身份验证标头中的结果() 但这在Swagger编辑器中验证失败,因为我认为的不允许组件(就像类型一样)。老实说,我不能完全理解这里的医生。 我确实读过关于规范扩展