当前位置: 首页 > 知识库问答 >
问题:

在Spring Cloud数据流中激活Avro消息转换器

段干宏硕
2023-03-14

我正在Spring云数据流上实现一个流应用程序。我想使用基于Avro的模式注册表客户端进行序列化和模式控制。我的基本目标是向源应用程序提供一些外部数据,将其转换为准备好的基于avro的模式,并将其发送到只接受此模式的接收器应用程序。我希望使用外部架构注册表服务器中的架构,而不是该架构的文件版本。我的代码如下所示:

@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class DisSampleSource {

    private final DisSampleSourceProperties properties;

    @Inject
    public DisSampleSource(DisSampleSourceProperties properties) {
        this.properties = properties;
    }

    @InboundChannelAdapter(Source.OUTPUT)
    public String feed() throws IOException {
        if (!Paths.get(properties.getPath()).toFile().exists()) {
            throw new InvalidPathException(this.properties.getPath(),
                    "The file does not exists or is of not proper type.");
        }
        return new String(Files.readAllBytes(Paths.get(properties.getPath())), StandardCharsets.UTF_8);
    }
}

POM:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-schema</artifactId>
        <version>1.1.1.RELEASE</version>
    </dependency>

在启动应用程序期间,我正在传递属性:

 --spring.cloud.stream.bindings.output.contentType=application/foo.bar.v1+avro

目前,应用程序无法启动,但有以下例外:

2017-02-13 16:25:30.430  WARN 2444 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
2017-02-13 16:25:30.440  INFO 2444 --- [           main] o.apache.catalina.core.StandardService   : Stopping service Tomcat
2017-02-13 16:25:30.480  INFO 2444 --- [           main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2017-02-13 16:25:30.485 ERROR 2444 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:754) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]

我做错了什么?

共有1个答案

邴奇逸
2023-03-14

Avro是Spring Cloud Stream模式的可选依赖项(其目的是在将来支持其他格式。为了激活模式支持,您只需添加

 <dependency>
     <groupId>org.apache.avro</groupId>
     <artifactId>avro</artifactId>
     <version>1.8.1</version>
 </dependency>

到项目。

 类似资料:
  • 原因:com.Google.API.client.googlejsonresponseException:403禁止{“代码”:403,“错误”:[{“域”:“全局”,“消息”:“项目尚未启用API。请使用谷歌开发人员控制台激活项目的'DataFlow'API。”,“原因”:“禁止”}],“消息”:“项目尚未启用API。请使用谷歌开发人员控制台激活项目的'DataFlow'API。”,“状态”:“

  • 是否有一种方法可以丢弃使用Spring Integration DSL方法定义的Spring Integration Service Activator中的消息? 更具体地说,假设下面的IntegrationFlow定义... 我发现,从返回似乎可以有效地结束流,但我不确定这是否是最优雅/正确的解决方案。如果这是推荐的解决方案,那很好。

  • 示例:http://undeniable.info/img/temp_ss.png 我使用的是用户审批系统,因此说您的帐户处于活动状态的消息根本不合适。我们仍然希望我们的用户确认他们的电子邮件,因此我想将邮件“帐户激活”更改为“帐户电子邮件验证”,并将“激活您的帐户”更改为“验证您的帐户电子邮件”或类似的内容。 在我的整个public_html dir中搜索这些帐户激活的消息时,不会返回任何内容,

  • 使用Spring Cloud DataFlow 1.2.2版本,配置如下: 我正在尝试创建一个流,它将从特定主题中读取并将其刷新到长水槽中,如下所示: 查看日志文件,我可以看到以下错误: 我还试图为kafka源代码的消费者/生产者配置一些属性 但我得到的结果是一样的 以下是Spring DataFlow打印的消费者详细信息: 我看到了类似的查询,但没有有效的答案,什么是属性来接受二进制json消息

  • 使用MVC Java编程配置方式时,如果你想替换Spring MVC提供的默认转换器,完全定制自己的HttpMessageConverter,这可以通过覆写configureMessageConverters()方法来实现。如果你只是想定制一下,或者想在默认转换器之外再添加其他的转换器,那么可以通过覆写extendMessageConverters()方法来实现。 下面是一段例子,它使用定制的Ob

  • 我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事: 我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。 相关的代理配置很简单: 此时,生产者配置也很简单: 现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。 如果我将这些数据放入主题中,会出现以下错误: 所以我改变了生产者配置如下: 现在制作