当前位置: 首页 > 知识库问答 >
问题:

MessageConversionException尝试序列化Avro规范记录列表并使用@Splitter拆分它们时

孙和安
2023-03-14

我有一个带有RabbitMQ绑定器的Spring Cloud Stream(Elmhurst Releas)Splitter。我正在尝试更新它,以便它将使用Avro模式作为有效负载。我得到了一个转换异常,这使得它看起来像是Avro转换器没有被调用,JSON转换器接收消息并在其上跳闸。

原因:组织。springframework。消息传递。转换器。MessageConversionException:无法写入JSON:不是映射:{“type”:“record”,“name”:“SkinnyMessage”,“namespace”:“com.example.avro”,“doc”:“用于传递消息对象引用的轻消息”,“fields”:[{“name”:“id”,“type”:“string”},{“name”:“guid”,“type”:“string”}]}(通过引用链:com.example.avro.skinymessage[“schema”]-

我已经确认,我可以使用生成的Avro类(maven-Avro插件)创建对象并将其序列化到磁盘,所以这一部分似乎是正确的。我已经将该项目从Groovy转换为Java,但仍然会遇到相同的错误,所以我认为这也被排除了。

这是Avro模式:

{
  "namespace": "com.example.avro",
  "type": "record",
  "name": "SkinnyMessage",
  "doc": "Light message for passing references to Message objects",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "guid",
      "type": "string"
    }
  ]
}

以及课程的相关部分:

@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
    @Timed(value = 'paging.query')
    @Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    List<SkinnyMessage> queryExecutor(def trigger){
        log.debug 'Building query'
        def query = queryConfiguration.buildQuery().toUriString()

        log.info "Executing query: ${query}"
        def response = service.getRecordings(query)

        log.info "Returning response collection: ${response.body.content.size()}"
        // We build a slim notification on each of the query responses
        def skinnyMessages = response.body.content.collect{
            new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
        }
        skinnyMessages
    }
...
}

编辑:当我逐步使用调试器时,我可以看到AvroSchemaRegistryClientMessageConverter失败了canConvertTo(有效负载,标头)调用,因为标头中的mimeType是application/json,而不是application/*avro,所以它继续尝试转换器链的其余部分。

共有1个答案

郎鸿
2023-03-14

如果我构建了一条消息并在其上设置了一个avro内容类型头,这似乎是可行的,但这似乎是一种黑客行为。

List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
    MessageBuilder.withPayload(
            new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
            .setHeader('contentType', 'application/*+avro')
            .build()
}

创建在RabbitMQ UI中正确显示的消息:

内容类型:应用程序/vnd。skinnymessage。v1 avro相关ID:f8be74d6-f780-efcc-295d-338a8b7f2ea0内容\u类型:应用程序/八位字节流有效负载96字节编码:字符串

thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91

如果我正确理解了文档,那么从应用程序中的设置来看,这应该是透明的。yml:(在模式注册表示例中):

spring:cloud:stream:bindings:output:contentType:application/*avro

 类似资料:
  • 我正在将avro序列化数据发布到kafka主题,然后尝试通过SQL CLI界面从该主题创建Flink表。我能够创建主题,但在执行SQL语句后无法查看主题数据。然而,我能够使用简单的Kafka消费者反序列化和打印已发布的数据。在SQL CLI上获取此错误: 表创建 表定义 Avro模式 消息值(消息键为无) 我不断发送相同的消息值使用简单的kafka生产者主题 Kafka主题描述 完整错误日志 来自

  • 期望:将XML文件中的整数列表读取到名为itemPool的列表中; 结果:“InvalidOperationException:未能将类型System.Collections.Generic.List`1[System.Int32]添加到已知的类型集合中。已为XML名称注册了一个类型http://schemas.microsoft.com/2003/10/Serialization/Arrays:

  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 问题内容: 在Apache Jackson和Jackson一起使用Apache Jersey进行JSON序列化时(在服务器和客户端上),在反序列化通用List时遇到问题。 我正在生成的JSON如下,“数据”中的所有3个类都实现“ CheckStatusDetail”: 产生此JSON的对象如下所示,我在客户端使用相同的类: 自从我将此注释添加到我的CheckStatusDetail接口后,就应用了

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 问题内容: 如果我们在python中有一个,并且想要基于一些特殊的条件创建子列表,我们应该怎么做? 例如: 会产生: 问题答案: itertools.groupby是一种方法(通常是这样): 由于这种特殊情况,我们甚至可以作弊: