我正在使用Spring Can Data Flow 1.7.2。发布并尝试关注这篇博文
创建一个“将处理器接收器组合到单个应用程序中:“一个新的接收器”。”
当我将代码作为博客的示例进行结构化时,我遇到了一些问题,我认为这是因为博客的示例使用了java。util。类似于处理器的函数。
我想我应该使用java。util。消费者,因为我正在尝试将现有接收器更改为处理器接收器混合
我的班级是这样的:
@EnableBinding(Sink.class)
public class SampleCombinedSink extends Something {
String modifiedPayload;
Logger log;
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
public void accept(String s){
log.info("SampleCombinedSink.accept() s="+s);
}
@StreamListener(Sink.INPUT)
public void doSink(String payload) {
consumer.accept(payload);
log.info("SampleCombinedSink.doSink() Payload received.");
log.info("SampleCombinedSink.doSink() payload="+ payload);
log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
}
}
我的输出如下所示:
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]
我的源每秒发出一个时间戳。
我被我的消费者弄糊涂了。
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
我想我可以做如下事情:
Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };
然后输入有效载荷
@StreamListener(Sink.INPUT)
public void doSink(String payload) {
将包含“消费者添加的内容i=[时间戳]”
事实并非如此。
我想更改doSink的输入,并通过添加“消费者添加的内容”进行更改,这样当输入到达doSink(字符串有效负载)时,有效负载将包含“消费者添加的内容I=[时间戳]”
我如何做到这一点?
在这种情况下,您不必更改Sink
应用程序的任何内容,只需在Sink
应用程序端使用相同的功能管道。
例如,使:
a combination of processor + sink into a single application: “a new sink”."
您所需要的只是将您的函数
bean作为
接收器
应用程序的一部分,或者甚至将
函数
bean放在单独的工件中,但放在
接收器
应用程序的
类路径中。一旦有了它,就可以定义Spring了。云流动作用接收器应用程序的定义。
您可以在此处看到此示例。由日志组成的应用程序定义了函数bean。
要运行示例,请执行以下操作:
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'source:http-transformer'
dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'sink:log-composed'
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
Created new stream 'helloComposed'
dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
Deployment request has been sent for stream 'helloComposed'
dataflow:>http post --data "friend" --target "http://localhost:9001"
在stream deploy命令中,您可以看到用于指定函数组成的应用程序是由日志组成的。
我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地
我的EJBTest有问题。 我安装了WildFly并配置了用户管理和应用程序管理。 我编写了一个EJB 3.0并进行了部署: 之后,我编写了一个简单的客户端来连接它: 用户名和密码都是应用程序用户凭据,而不是管理!对吗? 我收到以下错误: 线程“main”java中出现异常。lang.IllegalStateException:EJBClient00025:没有可用于处理调用上下文组织的[appN
问题内容: 我不了解以下方法的第三个参数的实用程序: 从javaDoc: 产生的结果等于: 如您所见,该参数未使用。例如,以下代码将字符串累积到ArrayList中: 但我期望这样: 问题答案: 在使用时,你的是平行的,因为在这种情况下,多个线程收集的元素到最终输出的子列表,并且这些子列表必须被组合以产生最终的。
在新的SpringBoot(2.0.2)中,通过RabbitBinder在源/处理器/接收器之间发送哈希图的支持似乎有所改变。 公共父pom。所有模块的xml如下所示: 明确规定如下: 如果出站通道上没有设置内容类型属性,Spring Cloud Stream将使用基于Kryo序列化框架的序列化器序列化有效负载。在目标位置反序列化消息需要有效负载类出现在接收方的类路径上。 根据这些规则,它可以使用
有没有办法把这两条流合并成一条?我使用第一个流在嵌套列表中进行过滤和查找,并使用第二个流根据流的结果创建地图。我想知道是否有一种方法可以用一条流来实现这一点。 像这样的
我不明白以下方法的第三个参数的效用: 来自Javadoc: 如您所见,没有使用参数。例如,以下内容将把字符串累加到ArrayList中: 但我预料到了这一点: