当前位置: 首页 > 知识库问答 >
问题:

关于组合处理器和接收器的问题

云文栋
2023-03-14

我正在使用Spring Can Data Flow 1.7.2。发布并尝试关注这篇博文

创建一个“将处理器接收器组合到单个应用程序中:“一个新的接收器”。”

当我将代码作为博客的示例进行结构化时,我遇到了一些问题,我认为这是因为博客的示例使用了java。util。类似于处理器的函数。

我想我应该使用java。util。消费者,因为我正在尝试将现有接收器更改为处理器接收器混合

我的班级是这样的:

@EnableBinding(Sink.class)
public class SampleCombinedSink extends Something {

    String modifiedPayload;
    Logger log;

    Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };

    public void accept(String s){
      log.info("SampleCombinedSink.accept() s="+s);
    }

    @StreamListener(Sink.INPUT)
    public void doSink(String payload) {

      consumer.accept(payload);
      log.info("SampleCombinedSink.doSink() Payload received.");
      log.info("SampleCombinedSink.doSink() payload="+ payload);
      log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
    }
}

我的输出如下所示:

SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]

我的源每秒发出一个时间戳。

我被我的消费者弄糊涂了。

Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };

我想我可以做如下事情:

Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };

然后输入有效载荷

@StreamListener(Sink.INPUT)
public void doSink(String payload) {

将包含“消费者添加的内容i=[时间戳]”

事实并非如此。

我想更改doSink的输入,并通过添加“消费者添加的内容”进行更改,这样当输入到达doSink(字符串有效负载)时,有效负载将包含“消费者添加的内容I=[时间戳]”

我如何做到这一点?

共有1个答案

庞意智
2023-03-14

在这种情况下,您不必更改Sink应用程序的任何内容,只需在Sink应用程序端使用相同的功能管道。

例如,使:

a combination of processor + sink into a single application: “a new sink”."

您所需要的只是将您的函数bean作为接收器应用程序的一部分,或者甚至将函数bean放在单独的工件中,但放在接收器应用程序的类路径中。一旦有了它,就可以定义Spring了。云流动作用接收器应用程序的定义。

您可以在此处看到此示例。由日志组成的应用程序定义了函数bean。

要运行示例,请执行以下操作:

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'source:http-transformer'

dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'sink:log-composed'

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
Created new stream 'helloComposed'


dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
Deployment request has been sent for stream 'helloComposed'

dataflow:>http post --data "friend" --target "http://localhost:9001"

在stream deploy命令中,您可以看到用于指定函数组成的应用程序是由日志组成的。

 类似资料:
  • 我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地

  • 我的EJBTest有问题。 我安装了WildFly并配置了用户管理和应用程序管理。 我编写了一个EJB 3.0并进行了部署: 之后,我编写了一个简单的客户端来连接它: 用户名和密码都是应用程序用户凭据,而不是管理!对吗? 我收到以下错误: 线程“main”java中出现异常。lang.IllegalStateException:EJBClient00025:没有可用于处理调用上下文组织的[appN

  • 问题内容: 我不了解以下方法的第三个参数的实用程序: 从javaDoc: 产生的结果等于: 如您所见,该参数未使用。例如,以下代码将字符串累积到ArrayList中: 但我期望这样: 问题答案: 在使用时,你的是平行的,因为在这种情况下,多个线程收集的元素到最终输出的子列表,并且这些子列表必须被组合以产生最终的。

  • 在新的SpringBoot(2.0.2)中,通过RabbitBinder在源/处理器/接收器之间发送哈希图的支持似乎有所改变。 公共父pom。所有模块的xml如下所示: 明确规定如下: 如果出站通道上没有设置内容类型属性,Spring Cloud Stream将使用基于Kryo序列化框架的序列化器序列化有效负载。在目标位置反序列化消息需要有效负载类出现在接收方的类路径上。 根据这些规则,它可以使用

  • 有没有办法把这两条流合并成一条?我使用第一个流在嵌套列表中进行过滤和查找,并使用第二个流根据流的结果创建地图。我想知道是否有一种方法可以用一条流来实现这一点。 像这样的

  • 我不明白以下方法的第三个参数的效用: 来自Javadoc: 如您所见,没有使用参数。例如,以下内容将把字符串累加到ArrayList中: 但我预料到了这一点: