当前位置: 首页 > 知识库问答 >
问题:

Spring Boot 2启用MessageHistory时无法发送kafka消息

松俊才
2023-03-14
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is java.lang.IllegalArgumentException: Incorrect type specified for header 'history'. Expected [class org.springframework.integration.history.MessageHistory] but actual type is [class org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType]. 

通过深入研究代码,在转换messagehistory标头https://github.com/spring-projects/spring-kafka/blob/14157742d7fa51ce8a22dfbdcdccc2e3c5b43c1c6f/spring-kafka/src/main/java/org/springframework/kafka/support/defaultkafkaheadermapper.java#l253-l275时,defaultkafkaheadermapper中引发

>

  • MessageHistory是否应该在发送前写入消息?

    如果1)的答案是肯定的,那么将messagehistory设置为受信任包的正确方法是什么?

    如果1)为否,那么在阻止将历史记录写入标头时,可能做了哪些错误的操作?

    提前感谢!


  • 共有1个答案

    安高翰
    2023-03-14

    您可以将org.springframework.integration.history包添加到白名单以还原以前的行为,方法是:defaultkafkaheadermapper.addTrustedPackages():

    /**
     * Add packages to the trusted packages list (default {@code java.util, java.lang}) used
     * when constructing objects from JSON.
     * If any of the supplied packages is {@code "*"}, all packages are trusted.
     * If a class for a non-trusted package is encountered, the header is returned to the
     * application with value of type {@link NonTrustedHeaderType}.
     * @param packagesToTrust the packages to trust.
     */
    public void addTrustedPackages(String... packagesToTrust) {
    

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#headers

    或者您可以从映射中完全排除这样的标头。

     类似资料:
    • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如

    • 我正在测试Spring Kafka的示例代码。它适用于连接,但不适用于连接。 我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效: 但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。 应用程序.属性 有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?

    • spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。 > 消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。 一开始一切都很顺利。 然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。 由于docker-compose.yml中的一个小更改(例如max_

    • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

    • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会

    • 我使用的是Simado GDT11调制解调器和丰富的USB到串行端口驱动程序。新连接出现在我的设备管理器中 我使用的是smslib示例中指定的基本类,但总有一些错误表明设备没有响应。由于使用USB端口,我添加了轮询参数,但没有效果。Commtest实用程序可以连接到同一个端口,所以我认为端口号不是问题。 Stacktrace: 代码:

    • 我的机器人不能发送嵌入或常规文本信息,代码执行,但没有什么是发送在不和谐的通道。我遵循了JDA维基的设置,我不知道我做错了什么。 机器人登录码

    • 问题内容: 我们已经编写了一个Java客户端,用于将消息发布到kafka。代码如下所示 当我们执行此代码时,我们得到以下消息和异常 这发生在无限循环中,并且应用程序挂起…当我们检查kafka代理时,发现该主题已创建…但是我们没有收到消息…我们已经坚持了一段时间。 .. 请帮忙 问题答案: 我们终于解决了这个问题…我们在混合环境中运行kafka,如下文所述- https://medium.com/@