自3.1版以来,用于处理队列的主要API已被弃用。课堂评论中写道:
从3.1开始就不推荐使用函数式编程模型
我在网上搜索了很多解决方案,但没有找到关于如何迁移的可靠的E2E解释。
正在寻找以下方面的示例:
如果有几种方法可以做到这一点(正如我在网上看到的),我很乐意为每个选项提供解释和典型用例。
首先,一些参考资料可能会有所帮助:
TL; DR
Spring现在不再使用基于注释的配置,而是使用检测到的消费者
/功能
/供应商
的bean来为您定义流。
输入/消费
而之前的代码是这样的:
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
}
其用法类似于:
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
现在,整个定义都无关紧要,可以这样做:
@Service
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
}
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
请注意,消费者bean是如何替换@StreamListener和@Input的。
关于配置,如果之前为了配置,您有一个应用程序。yml
看起来是这样的:
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
现在应该是这样:
spring:
cloud:
stream:
bindings:
gradesChannel-in-0:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
请注意gradesChannel
是如何被gradesChannel-in-0
取代的——要了解完整的命名约定,请参阅顶部的命名约定链接。
一些细节:
spring.cloud.function.definition
属性。gradesChannel
,您可以设置spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
,并在配置gradesChannel
.输出/供应商
这里的概念类似,您可以替换如下所示的配置和代码:
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
}
和
@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
}
}
现在可以替换为:
@Service
class StudentsQueueWriter {
@Bean
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
}
}
正如你所看到的,我们有一个主要的区别——它是什么时候被调用的,由谁调用的?
以前我们可以手动触发它,但现在它是由spring每秒(默认情况下)触发的。对于需要每秒发布传感器数据的用例来说,这很好,但对于想要发送事件消息的用例来说,这并不好。除了出于任何原因使用函数
,spring还提供了两种选择:
StreamBridge-link
使用StreamBridge
,您可以像这样显式地定义目标:
@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
fun publish(message: Message<Student>) {
streamBridge.send("studentsChannel-out-0", message)
}
}
通过这种方式,您不需要将目标通道定义为bean,但仍然可以发送消息。缺点是类中有一些显式配置。
ReactorAPI-link
另一种方法是使用某种反应性机制,如Emitter处理器
,并返回它。使用此代码,您的代码将类似于:
@Service
class StudentsQueueWriter {
val students: EmitterProcessor<Student> = EmitterProcessor.create()
@Bean
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students }
}
}
其用法可能类似于:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.onNext(Student("Adam"))
}
}
我有多个API,它们通过kafka(生成和使用消息)相互通信。在其中一个API中,我基于HTTP请求触发器生成消息(当调用endpoint时,生成一个meesage并发送给kafka),带有@Output和@EnableBinding注释。这些会议被订阅此主题的其他API使用。 现在,我尝试迁移到新的Spring-Cloud-Stream函数编程模型,并从文档中得出结论,使用外部源数据的Strea
输出 用print加上字符串,就可以向屏幕上输出指定的文字。比如输出'hello, world',用代码实现如下: >>> print 'hello, world' print语句也可以跟上多个字符串,用逗号“,”隔开,就可以连成一串输出: >>> print 'The quick brown fox', 'jumps over', 'the lazy dog' The quick brown
输出 用print()在括号中加上字符串,就可以向屏幕上输出指定的文字。比如输出'hello, world',用代码实现如下: >>> print('hello, world') print()函数也可以接受多个字符串,用逗号“,”隔开,就可以连成一串输出: >>> print('The quick brown fox', 'jumps over', 'the lazy dog') The qu
Ruby提供了两种I/O例程,第一种是简单的接口,我们已经用了很多了。 print "Enter your name: " name = gets Kernel模块提供了一整套和I/O相关的方法:gets, open, print, printf, putc, puts, readline, readlines, 和test等,这些方法能使你简单方便的进行Ruby编程。这些方法典型的对标准输入输
C++ 标准库提供了一组丰富的输入/输出功能,我们将在后续的章节进行介绍。本章将讨论 C++ 编程中最基本和最常见的 I/O 操作。 C++ 的 I/O 发生在流中,流是字节序列。如果字节流是从设备(如键盘、磁盘驱动器、网络连接等)流向内存,这叫做输入操作。如果字节流是从内存流向设备(如显示屏、打印机、磁盘驱动器、网络连接等),这叫做输出操作。 I/O 库头文件 下列的头文件在 C++ 编程中很重
到目前为止,我们讨论的所有示例本质上都是静态的。在本章中,我们将学习如何Haskell与用户动态交互。学习Haskell中使用的不同输入和输出技术。 1. 文件和流 到目前为止,我们已经对程序本身中的所有输入进行了硬编码,在前面学习的内容中都是从静态变量获取输入。本小节中,我们学习如何从外部文件读取和写入。 创建一个文件并命名为abc.txt。接下来,在此文本文件中输入以下一行: 接下来,我们将编