我向Kafka传达的信息如下:
private KafkaTemplate<String, MyMessage > kafkaTemplate;
public void sendMessage(MyMessage data) {
Message<MyMessage > message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC,topic)
.setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
.build();
kafkaTemplate.send(message);
我有自己的MyMessage自定义解/序列化程序,关键字是一个简单的字符串
@StreamListener
public void process(@Input("input") KStream<String,MyMessage> myStream){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KStream<?, ?> input();
}
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
KStream<String,MyMessage> myStream = myTable.toStream();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KTable<?, ?> input();
}
这是因为您在.group
方法中指定了自定义(de)序列化。看看文档的这部分和这部分,看看如何指定自定义值SERDE。
我正在尝试让Tomcat在关机时坚持我的会话。我已经解决了这样一个要求,即我在会话中存储的所有内容都是可序列化的,当我在Eclipse下运行Tomcat时,这很好:当代码更改迫使Eclipse重新加载webapp时,会话现在仍然存在。 然而,当我试图在一个独立的Tomcat中做同样的事情时,我遇到了一个问题。 我取消了${catalina.base}/conf/context.xml中的Manag
我试图使用javascript中的Fetch API向spring应用程序发送一些表单数据。我有以下代码来发送表单数据: 但是我得到一个415状态错误“不支持的媒体类型”。即使我将标头“Content-Type”专门设置为“Application/json”,它也会像“text/平原”一样发送 这是我从服务器得到的响应: 以下是在Spring中接受请求的方法: 我不知道为什么请求是以“text/p
Jackson可以在2.6.5中为以下类反序列化json,但在2.8.8中失败。 型号: JSON: 例外情况是: 我发现lombok创建的子构造函数导致了这个错误。当我去掉lombok注释或手动创建构造函数时,这种情况就会停止。无论如何,它都应该使用no args Child()构造函数。是什么导致了这个问题?
问题内容: 嗨,大家好,我是AngularJS的新手,我需要您的帮助。 我需要做的就是将json发布到API并获得正确的响应。 这是我不知道在哪里编写此代码的JSON。 JSON格式 如果我做的正确,则不确定。 控制器 服务.JS INDEX.HTML 希望你能帮助我。 我不断收到诸如“不受支持的媒体类型”之类的回复…我不知道该怎么办… 任何建议或评论都将很棒。。谢谢! 问题答案: 在Angula
我正在尝试使用apache camel jackson xml对xml进行反序列化,然后发生了一些奇怪的事情,我无法解释。这是我正在尝试反序列化的xml(它是简单的xmltd v xml文件): 这是我的POJO: 电视。Java语言 程序Java语言 Credits.java 其他POJO并不重要。下面是路由配置: 使用此设置和我在开始时提供的xml,会发生以下异常: 通用域名格式。faster
当我尝试将content-type设置为时,我会丢失边界,并在响应中得到一个错误: 添加UTF-8字符集的正确方法是什么?