当前位置: 首页 > 知识库问答 >
问题:

Kafka-Message自定义头中允许的包

丁子石
2023-03-14

在spring-kafka中,如何从包中添加类作为自定义头字段来信任?

消息是这样发送的:

@Autowired
private KafkaTemplate kafkaTemplate;

Message<BouquetMQDTO> m = MessageBuilder
            .withPayload(payload)
            .setHeader(KafkaHeaders.TOPIC, "topic")
            .setHeader("EVENT_TYPE", MessageType.UPSERT)
            .build();
kafkaTemplate.send(m);

接收端如下所示:

@Component
@KafkaListener(topics = "topic")
public class KafkaController {

    @KafkaHandler
    public void listen(
        @Payload Object objectDTO,
        @Header(value = "EVENT_TYPE") MessageType messageType
    ) { 
        System.out.println(messageType);
    }
}

我不断得到的例外是:

@Bean
public KafkaHeaderMapper defaultKafkaHeaderMapper() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.addTrustedPackages("my.package");
    return mapper;
}
props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package");
return new DefaultKafkaConsumerFactory<>(props);

共有1个答案

吕岳
2023-03-14

jsonDeserializer.trusted_packages与头完全无关。它可以处理`ConsumerRecord的。头映射器发生在不同的地方。

不确定是否使用Spring Boot,但是有一个MessagingMessageListenerAdapter,它附带了一个默认的MessagingMessageConverter,因此,默认的DefaultKafkAheaderMapper。要自定义您自己的HeaderMapper,您需要将MessagingMessageConverter、引用HeaderMapper并将该转换器注入到AbstractKafKalistenerContainerFactorybean中。

如果您处理Spring Boot,只需声明MessagingMessageConverter,它将自动配置到框架创建的AbstractKafKalistenerContainerFactory中。

通过这种方式,您可以访问受信任的包。但是,我认为它还不能起作用,因为enum默认情况下对JSON不太友好:https://www.baeldung.com/jackson-serialize-enums

 类似资料:
  • 我需要使用 restlet 客户端放置以下 rest 请求: 我可以在一个单独的调用中获取XToken,但是在当前的调用中,我无法在我的ClientResource对象中设置“Authorization”头。 有人可以建议我需要为我拥有的以下代码行添加哪些代码: 在restlet中创建和添加“Authorization”标头会导致restlet客户端库中出现错误。 提前谢谢 阿希什·夏尔马

  • Spring Cloud Bus使用 Spring Cloud Stream广播消息,以便获取消息流,只需要在类路径中包含您选择的binder实现。有AMQP(RabbitMQ)和Kafka(spring-cloud-starter-bus-[amqp,kafka])的公共汽车专用起动方便。一般来说,Spring Cloud Stream依赖于用于配置中间件的Spring Boot自动配置约定,因

  • 我使用方法处理消息。 我已经找到了这个任务,它仍然是打开的。https://issues.apache.org/jira/browse/kafka-5632?src=confmacro

  • 我试图用Spring Cloud Stream创建一个kafka使用者,以便监听在任何Spring上下文之外构建的kafka消息,并使用自定义头(operationType)。 我使用的是Spring Boot 1.5.x/Spring Cloud egdware.sr5和1.1.1版本的kafka-client和Kafka2.11。 我的侦听器类包含此方法 而operationType标头是存在

  • 我正在使用Spring Boot 2.3.0和Spring Kafka 2.5.0,在我的KafkaListener中,我试图将MessageHeaders映射到一个自定义类。下面的代码可以工作,但给出了byte[]格式的头,然后我必须将其转换为侦听器内部的类(并对每个侦听器重复此操作),这是我希望避免的。 当我将代码更改为: 我查看了https://docs.spring.io/spring-k

  • 我是android开发的新手,我正在尝试创建一个自定义的适配器,为RecyclerView提供一个只包含一个图像和一个文本视图的视图。 但是,我试图为onBindViewHolder(VH,int)定义一个简单的扩展ViewHolder,而Android studio根本不会采用自定义ViewHolder,'MyViewHolder' - 说“该方法不会覆盖其超类中的方法。 如果我把参数变成一个普