当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud数据流自定义Scala处理器无法从Starter Apps(SCDF 2.5.1)发送/接收数据

柳胡媚
2023-03-14

我一直在为Spring云数据流在Scala中创建一个简单的自定义处理器,并且在向starter应用程序发送/接收数据时遇到了问题。我无法看到任何消息在流中传播。流的定义是时间触发器。时间单位=秒|传递日志|日志其中传递日志是我的自定义处理器。

我使用的是Spring Cloud Data Flow 2.5.1和Spring Boot 2.2.6。

这是处理器使用的代码-我使用的是功能模型。

@SpringBootApplication
class PassThroughLog {

  @Bean
  def passthroughlog(): Function[String, String] = {
    input: String => {
      println(s"Received input `$input`")
      input
    }
  }
}

object PassThroughLog {
  def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}

应用yml公司

spring:
  cloud:
    stream:
      function:
        bindings:
          passthroughlog-in-0: input
          passthroughlog-out-0: output

build.gradle.kts

// scala
implementation("org.scala-lang:scala-library:2.12.10")

// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")

如果这里缺少代码示例,我已经将整个项目发布到github。我还在那里发布了日志,因为它们很长。

当我引导本地Kafka集群并将任意数据推送到输入主题时,我能够看到数据流过处理器。然而,当我在Spring云数据流上部署应用程序时,情况并非如此。我正在Kubernetes通过Docker部署该应用程序。

此外,当我使用定义<代码>时间>触发器部署流时。时间单位=秒| log,我在日志接收器中看到消息。这让我确信问题出在定制处理器上。

我是否错过了一些简单的东西,例如依赖项或额外配置?非常感谢任何帮助。

共有2个答案

邴俊民
2023-03-14

原来问题出在我的Dockerfile上。为了便于配置,我有一个build参数来指定入口点中使用的jar文件。为了实现这一点,我使用了ENTRYPOINT的shell版本。将我的入口点更改为exec版本解决了我的问题。

shell版本的ENTRYPOINT不能很好地使用图像参数(docker run

更改Dockerfile的位置:

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]

已修复问题。

郭思淼
2023-03-14

在SCDF中使用Spring Cloud Stream 3. x版本时,您必须设置一个附加属性,以让SCDF知道哪些通道绑定配置为输入和输出通道。

请参见:功能应用程序

特别注意以下特性:

应用程序。时间来源。Spring云流动作用绑定。timeSupplier-out-0=输出

应用程序。原木水槽。Spring云流动作用绑定。logConsumer-in-0=输入

在您的情况下,必须将函数绑定分别映射到输入和输出。

 类似资料:
  • 我有一个特定的要求,其中,我需要检查空的数据文件。如果为空,则填充默认值。这是我尝试过但没有得到我想要的东西。 这个想法是,如果df不是空的,就得到它。如果为空,则填写默认值为零。这似乎不起作用。以下是我得到的。 请帮忙。

  • 我试着做一个简单的服务器-客户机套接字通信,但服务器似乎无法正常工作。无论何时发送或接收,我都会收到此错误: 非插座上的插座操作:非插座上的插座操作 奇怪的是,这只会在服务器发送时出现。客户似乎还可以:

  • 我正在开发一个C#Java卡(智能卡)程序,并试图利用github上的PCSC sharp库。 这是我的请求的“短/tl; dr”版本: PCSC清晰示例涵盖了Iso7816 Case2短。有人能纠正我下面的例子吗,或者给我提供一个Iso7816 Case3短(命令数据。没有响应数据)和Case4短(命令数据。预期的响应数据)的例子? 这是我的请求的"长"版本:在Java卡上,我同时使用标准和自定

  • 因此,我正在用Java编写一个程序,在DatagramSocket和DataGramPacket的帮助下发送和接收数据。问题是,当我发送数据/接收数据时,数据在我发送的程序中也会有所不同,但只是在某些情况下,比如: 但有时会起作用,比如:

  • 问题内容: 我开始工作,并和奇迹是否可以定制响应期待。当前对DataTables插件的期望是这样的: 在服务器端,API由 服务器的响应是: 因此,有没有一种方法可以调整Datatables插件以接受/映射此响应,或者我必须找到一种将期望的字段添加到api的方法? 到目前为止,我已经做到了: 任何帮助将不胜感激。 提前致谢 :) 问题答案: 您可以将函数传递给DataTables 选项,这将使您完

  • 然而,我很难从数据库视图中获得最近的更新。主要问题是何时向数据库中添加新条目。由于数据库是在初始化处理器时查询的,因此自定义处理器将不会有新的条目。 我尝试在public void onTrigger()函数中实现查询;然而,这将导致管道备份,因为它将对每个flowfile查询数据库(如果每秒有数千个flowfile传入,这就不理想了)。 处理器启动时是否有查询数据库的方法;不在每个flowfil