我有一个带有RabbitMQ绑定器的Spring Cloud Stream(Elmhurst Releas)Splitter。我正在尝试更新它,以便它将使用Avro模式作为有效负载。我得到了一个转换异常,这使得它看起来像是Avro转换器没有被调用,JSON转换器接收消息并在其上跳闸。
原因:组织。springframework。消息传递。转换器。MessageConversionException:无法写入JSON:不是映射:{“type”:“record”,“name”:“SkinnyMessage”,“namespace”:“com.example.avro”,“doc”:“用于传递消息对象引用的轻消息”,“fields”:[{“name”:“id”,“type”:“string”},{“name”:“guid”,“type”:“string”}]}(通过引用链:com.example.avro.skinymessage[“schema”]-
我已经确认,我可以使用生成的Avro类(maven-Avro插件)创建对象并将其序列化到磁盘,所以这一部分似乎是正确的。我已经将该项目从Groovy转换为Java,但仍然会遇到相同的错误,所以我认为这也被排除了。
这是Avro模式:
{
"namespace": "com.example.avro",
"type": "record",
"name": "SkinnyMessage",
"doc": "Light message for passing references to Message objects",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "guid",
"type": "string"
}
]
}
以及课程的相关部分:
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
@Timed(value = 'paging.query')
@Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
List<SkinnyMessage> queryExecutor(def trigger){
log.debug 'Building query'
def query = queryConfiguration.buildQuery().toUriString()
log.info "Executing query: ${query}"
def response = service.getRecordings(query)
log.info "Returning response collection: ${response.body.content.size()}"
// We build a slim notification on each of the query responses
def skinnyMessages = response.body.content.collect{
new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
}
skinnyMessages
}
...
}
编辑:当我逐步使用调试器时,我可以看到AvroSchemaRegistryClientMessageConverter失败了canConvertTo(有效负载,标头)调用,因为标头中的mimeType是application/json,而不是application/*avro,所以它继续尝试转换器链的其余部分。
如果我构建了一条消息并在其上设置了一个avro内容类型头,这似乎是可行的,但这似乎是一种黑客行为。
List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
MessageBuilder.withPayload(
new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
.setHeader('contentType', 'application/*+avro')
.build()
}
创建在RabbitMQ UI中正确显示的消息:
内容类型:应用程序/vnd。skinnymessage。v1 avro相关ID:f8be74d6-f780-efcc-295d-338a8b7f2ea0内容\u类型:应用程序/八位字节流有效负载96字节编码:字符串
thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91
如果我正确理解了文档,那么从应用程序中的设置来看,这应该是透明的。yml:(在模式注册表示例中):
spring:cloud:stream:bindings:output:contentType:application/*avro
我正在将avro序列化数据发布到kafka主题,然后尝试通过SQL CLI界面从该主题创建Flink表。我能够创建主题,但在执行SQL语句后无法查看主题数据。然而,我能够使用简单的Kafka消费者反序列化和打印已发布的数据。在SQL CLI上获取此错误: 表创建 表定义 Avro模式 消息值(消息键为无) 我不断发送相同的消息值使用简单的kafka生产者主题 Kafka主题描述 完整错误日志 来自
期望:将XML文件中的整数列表读取到名为itemPool的列表中; 结果:“InvalidOperationException:未能将类型System.Collections.Generic.List`1[System.Int32]添加到已知的类型集合中。已为XML名称注册了一个类型http://schemas.microsoft.com/2003/10/Serialization/Arrays:
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
问题内容: 在Apache Jackson和Jackson一起使用Apache Jersey进行JSON序列化时(在服务器和客户端上),在反序列化通用List时遇到问题。 我正在生成的JSON如下,“数据”中的所有3个类都实现“ CheckStatusDetail”: 产生此JSON的对象如下所示,我在客户端使用相同的类: 自从我将此注释添加到我的CheckStatusDetail接口后,就应用了
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
问题内容: 如果我们在python中有一个,并且想要基于一些特殊的条件创建子列表,我们应该怎么做? 例如: 会产生: 问题答案: itertools.groupby是一种方法(通常是这样): 由于这种特殊情况,我们甚至可以作弊: