当前位置: 首页 > 知识库问答 >
问题:

防止在Spring Cloud Stream中使用\uu TypeId\uuuu

叶茂才
2023-03-14

我们有一个流氓生产者将Kafka Header__TypeId__设置为一个类,该类是生产者的一部分,但不是使用Kafka Streams绑定器在Spring Cloud Stream应用程序中实现的消费者的一部分。它导致了一个异常java.lang.IllegalArgumentException:类“com.bad.MyClass”不在受信任的包中:[java.util、java.lang、de.datev.pws.loon.dcp.foreignmodels.*]。如果您认为这个类可以安全地反序列化,请提供它的名称。如果序列化仅由受信任的源完成,您也可以启用信任所有 (*).

我们如何确保在消费者内部忽略这个TypeId头?

一些stackoverflow答案指向spring。json。使用类型headers=false,但它似乎是一个“旧”属性,不再有效。

应用yaml:

spring:
  json.use.type.headers: false
  application:
    name: dcp-all
  kafka:
    bootstrap-servers: 'xxxxx.kafka.dev.dvint.de:9093'
  cloud:
    stream:
      kafka:
        streams:
          binder:
            required-acks: -1 # all in-sync-replicas

...

堆栈跟踪:

    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

这是一个单元测试

    @Test
    void consumeWorksEvenWithBadTypesHeader() throws JsonProcessingException, InterruptedException {
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        producerProps.put("key.serializer", StringSerializer.class.getName());
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerProps);

        List<Header> headers = Arrays.asList(new RecordHeader("__TypeId__", "com.bad.MyClass".getBytes()));
        ProducerRecord<String,String> p = new ProducerRecord(TOPIC1, 0, "any-key",
            "{ ... some valid JSON ...}", headers);

        try {
            KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
            template.send(p);

            ConsumerRecord<String, String> consumerRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC2, DEFAULT_CONSUMER_POLL_TIME);

            // Assertions ...
        } finally {
            pf.destroy();
        }
    }

共有1个答案

翟凯
2023-03-14

您有两种选择:

  1. 在producer端,将属性设置为省略添加类型信息头
  2. 在使用者端,将属性设置为不使用类型信息标头

https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-塞尔德

它不是一个“旧”的财产。

    /**
     * Kafka config property for using type headers (default true).
     * @since 2.2.3
     */
    public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";

它需要在消费者属性中设置。

 类似资料:
  • 我目前正在使用JUnit4.x和AssertJ。我想确保没有使用Hamcrest偷偷摸摸的。但是,Hamcrest核心是JUnit运行时必需的依赖项,请参见JUnit问题#1429:JUnit4.12关于org/Hamcrest/selfdescription#1429,因此直接从pom.xml中排除依赖项或调整类路径都不起作用。 如何确保测试不使用Hamcrest断言?我想在构建时使用Maven

  • 我有一个有3个选项卡的ViewPager。根据我所看到的情况,当容器活动开始时,前两个片段将处于恢复状态。我还注意到,只有并排的两个片段被恢复,第三个被停止。我遇到的问题是,我从网络加载数据用于第一个片段,并将自定义对象存储在一个ArrayList中,这很正常,但当我导航到第三个片段时,数据会随着第一个片段进入停止状态而丢失。因此,当从第三个片段导航回第一个片段时,ArrayList就会为空(我在

  • 问题内容: 我遇到了一群黑客。他们几次入侵了我客户的网站,我的客户更加生气:(我的客户丢失了他的数据库(有数百条记录),不得不输入所有:( 现在,我将进行更多介绍。 固定文件权限 更改了ftp和主机登录信息 清除所有远程mysql访问 现在正在处理SQL注入问题。我在管理面板登录参数中添加了mysql_real_escape_string。所以我还应该在哪里使用mysql_real_escape_

  • 问题内容: 最近,我们对代码进行了安全审核,问题之一是我们的应用程序受到 Xml eXternal Entity (XXE)攻击。 基本上,该应用程序是一个计算器,可通过Web服务以XML形式接收输入。 这是对我们的应用程序进行此类XXE攻击的示例: 如您所见,我们可以引用指向外部文件()的实体。 关于XML输入本身(该部分)未与JAXB(v2.1)一起编组。Web服务部分基于jaxws- rt(

  • 问题内容: 我有一个现有的代码,其中应用程序根据很多条件生成不同的sql并通过hibernate会话createSQLQuery()执行它们。在这里,这些参数与作为普通字符串替换驻留在java类中的sql字符串相连接。现在的问题是,我需要防止sql注入。因此,为此,我必须使用getNamedQuery()并绑定参数,以便hibernate将处理特殊字符。但是问题在于将字符串sql的字符串移动到xm

  • 想知道如何使用Xstream API修复Xml外部实体(XXE)漏洞。 就像我们能做的一样 使用DocumentBuilderFactory。更多详细信息-https://www.owasp.org/index.php/XML_External_Entity_(XXE)\u预防\u备忘单 我的代码是这样的-