当前位置: 首页 > 知识库问答 >
问题:

如何在春云流反序列化时信任所有包?

蒙弘图
2023-03-14

该消费者不需要受信任的包:

@Bean
    fun berichtStateStoreBuilder() = Consumer<GlobalKTable<String, BerichtEvent>> {}

这突然发生了:

@Bean
    fun berichtStateStoreBuilder() = Consumer<KStream<ByteArray, ByteArray>> {
        it
            .transform({ EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
            .mapValues { v -> v.payload as BerichtEvent }
            .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde()))
            .aggregate(
                { BerichtAggregator() },
                { _, event, aggregator -> aggregator.add(event) },
                Named.`as`("aggregate"),
                Materialized.`as`<String, BerichtAggregator, KeyValueStore<Bytes, ByteArray>>(BerichtStore.NAME)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(JsonSerde(BerichtAggregator::class.java))
            )

我已经尝试了以下方法,但它们不起作用,因为我只得到以下错误:

Caused by: java.lang.IllegalArgumentException: The class 'at.wrwks.smp.controlling.event.BerichtEvent' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 8 more
    < li>
    @Bean
    fun defaultKafkaHeaderMapper(objectMapper: ObjectMapper): DefaultKafkaHeaderMapper {
        val mapper = DefaultKafkaHeaderMapper(objectMapper, "event_type")

        val rawMappedHeaders = HashMap<String, Boolean>()
        rawMappedHeaders[BaseEvent.EVENT_TYPE_HEADER] = true
        mapper.setRawMappedHeaders(rawMappedHeaders)
        mapper.addTrustedPackages("*")
        return mapper
    }
spring.cloud.stream.kafka.streams.binder.header-mapper-bean-name: defaultKafkaHeaderMapper
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers: false
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages: '*'

Spring Cloud Stream版本:3.1.2,带有Kafka Streams活页夹。

使用自定义JSON serde解决方法:

            .groupByKey(Grouped.with(Serdes.StringSerde(), Serdes.serdeFrom(
                SimpleJsonSerializer(objectMapper), SimpleJsonDeserializer(objectMapper, BerichtEvent::class.java)
            )))

共有2个答案

朱鹤轩
2023-03-14

虽然这次我已经这样做了几十次,但我忘记将目标类传递给构造函数 JsonSerde()。这是正确的:

.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(BerichtEvent::class.java)))

显然,当没有类被传递时,就不能将任何包添加到受信任的包中。通过传递类,Serde将使用目标传递所属的包进行配置。

爱乐邦
2023-03-14

我刚刚测试了它,它对我来说效果很好...

@SpringBootApplication
public class So67059860Application {

    public static void main(String[] args) {
        SpringApplication.run(So67059860Application.class, args);
    }

    @Bean
    public Consumer<KStream<String, Foo>> input() {
        return str -> str.foreach((k, v) -> System.out.println(v));
    }

}
public class Foo {

    private String bar;

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=*
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.value.default.type=com.example.demo.Foo
spring.application.name=so67059860

spring.cloud.function.definition=input

#logging.level.root=debug
Foo [bar=baz]

启动2.4.4,云2020.0.2(SCSt 3,1.2)。

在JsonSerde中设置断点。configure()以查看正在使用的属性。

 类似资料:
  • 应用程序A写入用户对象(json)下面的Kafka主题: 应用程序 B 正在尝试使用此用户对象(用户.java驻留在应用程序 B 项目中)与以下 application.yml(多个绑定器): 下面是我的Spring Cloud Stream处理器类的外观: 但是,它没有将Message有效负载转换为“用户”类,而是不断抛出错误,未找到 父类“User事件”。 有什么想法吗?

  • 并将这些行添加到 但仍会出现以下错误: org.apache.kafka.common.errors.SerializationException:反序列化偏移量1处分区topic2-0的键/值时出错。如有需要,请通过记录继续使用。原因:java.lang.IllegalArgumentException:类“com.SpringMiddleware.Entities.Crime”不在受信任的包[

  • 问题内容: 我的直觉告诉我,必须以某种方式将其转换为字符串或byte [](在Go中甚至可能是相同的东西?),然后将其保存到磁盘。 我找到了这个包(http://golang.org/pkg/encoding/gob/),但似乎仅用于结构? 问题答案: 序列化数据有多种方法,Go为此提供了许多软件包。某些常见编码方式的软件包: 处理地图很好。以下示例显示了地图的编码/解码: 操场

  • 我有一个kdtree,其节点由以下字段组成:公共静态类节点实现可序列化{ 其中DataPoint定义: 公共静态类DataPoint实现可序列化{公共可比X;公共可比Y;公共可比Z; 我想序列化树,存储在文件中并在回答范围查询时反序列化。我对这个概念od序列化的理解并不好。从我收集的任何内容中,我编写了以下函数,但不起作用。有人能帮忙吗?

  • 我加载xml文件如下 使2类 我加载xml文件(temp.xml),然后序列化类2位置和头然后StringReader,它从文件中读取字符串,然后类序列化成为反序列化 但我看到error error msg在XML文档中有一个错误(1,1)。哪里?:at系统。Xml。序列化。XmlSerializer。在系统上反序列化(XmlReader XmlReader、String encodingStyl

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?