该消费者不需要受信任的包:
@Bean
fun berichtStateStoreBuilder() = Consumer<GlobalKTable<String, BerichtEvent>> {}
这突然发生了:
@Bean
fun berichtStateStoreBuilder() = Consumer<KStream<ByteArray, ByteArray>> {
it
.transform({ EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
.mapValues { v -> v.payload as BerichtEvent }
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde()))
.aggregate(
{ BerichtAggregator() },
{ _, event, aggregator -> aggregator.add(event) },
Named.`as`("aggregate"),
Materialized.`as`<String, BerichtAggregator, KeyValueStore<Bytes, ByteArray>>(BerichtStore.NAME)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(BerichtAggregator::class.java))
)
我已经尝试了以下方法,但它们不起作用,因为我只得到以下错误:
Caused by: java.lang.IllegalArgumentException: The class 'at.wrwks.smp.controlling.event.BerichtEvent' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 8 more
@Bean
fun defaultKafkaHeaderMapper(objectMapper: ObjectMapper): DefaultKafkaHeaderMapper {
val mapper = DefaultKafkaHeaderMapper(objectMapper, "event_type")
val rawMappedHeaders = HashMap<String, Boolean>()
rawMappedHeaders[BaseEvent.EVENT_TYPE_HEADER] = true
mapper.setRawMappedHeaders(rawMappedHeaders)
mapper.addTrustedPackages("*")
return mapper
}
spring.cloud.stream.kafka.streams.binder.header-mapper-bean-name: defaultKafkaHeaderMapper
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers: false
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages: '*'
Spring Cloud Stream版本:3.1.2,带有Kafka Streams活页夹。
使用自定义JSON serde解决方法:
.groupByKey(Grouped.with(Serdes.StringSerde(), Serdes.serdeFrom(
SimpleJsonSerializer(objectMapper), SimpleJsonDeserializer(objectMapper, BerichtEvent::class.java)
)))
虽然这次我已经这样做了几十次,但我忘记将目标类传递给构造函数 JsonSerde()。
这是正确的:
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(BerichtEvent::class.java)))
显然,当没有类被传递时,就不能将任何包添加到受信任的包中。通过传递类,Serde
将使用目标传递所属的包进行配置。
我刚刚测试了它,它对我来说效果很好...
@SpringBootApplication
public class So67059860Application {
public static void main(String[] args) {
SpringApplication.run(So67059860Application.class, args);
}
@Bean
public Consumer<KStream<String, Foo>> input() {
return str -> str.foreach((k, v) -> System.out.println(v));
}
}
public class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=*
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.value.default.type=com.example.demo.Foo
spring.application.name=so67059860
spring.cloud.function.definition=input
#logging.level.root=debug
Foo [bar=baz]
启动2.4.4,云2020.0.2(SCSt 3,1.2)。
在JsonSerde中设置断点。configure()以查看正在使用的属性。
应用程序A写入用户对象(json)下面的Kafka主题: 应用程序 B 正在尝试使用此用户对象(用户.java驻留在应用程序 B 项目中)与以下 application.yml(多个绑定器): 下面是我的Spring Cloud Stream处理器类的外观: 但是,它没有将Message有效负载转换为“用户”类,而是不断抛出错误,未找到 父类“User事件”。 有什么想法吗?
并将这些行添加到 但仍会出现以下错误: org.apache.kafka.common.errors.SerializationException:反序列化偏移量1处分区topic2-0的键/值时出错。如有需要,请通过记录继续使用。原因:java.lang.IllegalArgumentException:类“com.SpringMiddleware.Entities.Crime”不在受信任的包[
问题内容: 我的直觉告诉我,必须以某种方式将其转换为字符串或byte [](在Go中甚至可能是相同的东西?),然后将其保存到磁盘。 我找到了这个包(http://golang.org/pkg/encoding/gob/),但似乎仅用于结构? 问题答案: 序列化数据有多种方法,Go为此提供了许多软件包。某些常见编码方式的软件包: 处理地图很好。以下示例显示了地图的编码/解码: 操场
我有一个kdtree,其节点由以下字段组成:公共静态类节点实现可序列化{ 其中DataPoint定义: 公共静态类DataPoint实现可序列化{公共可比X;公共可比Y;公共可比Z; 我想序列化树,存储在文件中并在回答范围查询时反序列化。我对这个概念od序列化的理解并不好。从我收集的任何内容中,我编写了以下函数,但不起作用。有人能帮忙吗?
我加载xml文件如下 使2类 我加载xml文件(temp.xml),然后序列化类2位置和头然后StringReader,它从文件中读取字符串,然后类序列化成为反序列化 但我看到error error msg在XML文档中有一个错误(1,1)。哪里?:at系统。Xml。序列化。XmlSerializer。在系统上反序列化(XmlReader XmlReader、String encodingStyl
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?