当前位置: 首页 > 知识库问答 >
问题:

Spring云流Avro

段超
2023-03-14

我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗?

这是供应商的功能bean

@Bean
public Supplier<Sensor> supplier() {
    Random random = new Random();
    return () -> Sensor.newBuilder()
        .setId("id")
        .setTemperature(random.nextFloat())
        .setAcceleration(random.nextFloat())
        .setVelocity(random.nextFloat())
        .build();
}

和配置

spring:
  cloud:
    schema-registry-client:
      endpoint: http://localhost:8990
    schema:
      avro:
        schema-locations: classpath:avro/sensor.avsc
    stream:
      function:
        definition: supplier
        bindings:
          supplier-out-0: sensor
      bindings:
        sensor:
          destination: sensor-exchange
          group: sensor-queue
          content-type: application/*+avro

共有1个答案

穆嘉
2023-03-14

这是由于转换发生的问题。我在Spring Cloud Function中提交了一个问题:https://github.com/spring-cloud/spring-cloud-function/issues/611

作为一种解决方法,您可以将内容类型用作:spring。云流动绑定。。。内容类型:应用程序/avro,在生产者配置中没有通配符类型。当上述问题得到解决后,您可以继续使用包含通配符的内容类型(application/*avro)。很抱歉给您带来不便。

我更新了此示例以反映此解决方案,当我们得到适当的修复时,将恢复此解决方案。

更新:Spring云函数中的问题在3.0中得到了解决。x分支。请使用以下依赖项获取修复。

 <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
    <version>3.0.12.BUILD-SNAPSHOT</version>
</dependency>

有了这个,您可以在内容类型中使用通配符类型:Application/*avro

<代码>3.0.12。该版本将于下周发布。

 类似资料:
  • 使用Spring Cloud Stream版本Chelsea. SR2,RabbitMQ作为消息代理。要拥有多个消费者,我们使用属性并发(入站消费者的并发)。 如果我们将并发设置为50。它从1开始,慢慢地增加消费者计数。有没有任何可能的解决方案可以使用更高的数字而不是一个来启动初始消费者计数,以提高消费者性能。

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。 这是我的lib版本: 我的Spring云流形态: 我的测试java代码: 要运行的测试类: 我还添加了TransactionManager: 在最后的这个示例中,我的兔子队列中有: 或者我应该只有两条消息test.other.request.

  • 大家好,我完全是Spring Cloud Streams框架的新手。 在用于Kafka Streams的spring cloud stream文档中,我可以看到在示例中使用的应用程序yaml/properties文件中引用了前缀为spring.cloud.stream.function.definition等的属性。 我知道Cloud streams使用Cloud函数,但是Cloud stream