当前位置: 首页 > 知识库问答 >
问题:

如何将“Kafka Streams Binder”与“Functional Style”和DI结合使用?

方夜洛
2023-03-14

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model显示了一个示例,其中可以使用属性spring.cloud.stream.bindings.process_in.destination设置输入主题。

现在我想使用依赖注入,例如。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(JavaMailSender mailSender) {...}

启动应用程序(基于Spring Boot)时,属性为Spring。云流动绑定。process\u in。忽略目标,而订阅输入主题“输入”。

编辑:这是Kotlin代码(不含导入)

梅勒。kt:

@Configuration
class Mailer {
    @Bean
    fun sendMail(/*mailSender: JavaMailSender*/) = Consumer<KStream<Any, Mail>> { input ->
        input.foreach { _, mail -> println("mail = $mail") }
    }
}

邮政kt:

data class Mail(var from: String = "", var to: String = "", var subject: String = "", var body: String = "")

pplication.kt:

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args) {
    }
}

应用yml::

spring.cloud.stream:
  bindings.sendMail_in.destination: mail
  kafka.binder.configuration.listeners: PLAINTEXT://localhost:9092

共有1个答案

郝池暝
2023-03-14

活页夹中存在一些问题,无法正确autowire提供给函数/消费者bean的bean。不过,最新的快照解决了这些问题。请确保您使用的是最新的快照(3.0.0. BUILD-SNAPSHOT)。这是一个示例应用程序,适用于您提供的相同场景。

 类似资料:
  • 问题内容: 我有一个带有MySQL的远程数据库,并且将我的应用程序用户的照片存储在数据库中,作为LONGTEXT类型的数据库的一行。 我使用Base64将照片转换为字符串。 我使用JSON和PHP连接到远程数据库,因此,我必须使用Base64,因为据我所知,JSON和PHP需要在参数上发送字符串,而使用Base64可以将照片转换为字符串。 可以,但是非常慢。当我加载100 KB的照片时,会花费很多

  • 问题内容: 我针对正常运行的Web应用程序进行了一系列功能测试,但是每个功能测试都需要通过和注释提供的类级别的设置和拆卸,因此需要JUnit 4.0或更高版本。 现在,我想使用少量的这些功能测试来执行负载测试,这些功能测试可以模拟大量请求Web应用程序相关页面的用户。为了让每个用户在JWebUnit中都有自己的“模拟浏览器”,我需要在JUnitPerf中使用TestFactory来实例化测试中的类

  • 问题内容: 我正在尝试开发一个JMS 独立应用程序 来读写MQSeries上的Queue。我的老板要求我使用 纯Java JMS (而不是ibm.mq lib)来执行此操作。 这是建立jms连接所需的信息: 您知道该怎么做?还是您有任何链接教我做到这一点。 问题答案: 这里的问题是“我的老板要求我使用纯Java JMS(不是ibm.mq lib)来做到这一点”的要求。JMS是一个规范,每个实现都必

  • 问题内容: 我正在尝试使用node.js制作Samsung Smart TV应用程序。 在我的项目中,我想使我的应用程序与服务器PC通信。 根据许多网站,我可以使用“ jsonp”来做到这一点。 这是我发现的客户端代码。 而且,这是我发现的服务器端代码。 这些代码在我的PC(服务器PC)上正常工作,但是当我在其他计算机上打开客户端页面时,它不起作用。 控制台只给我这个日志: 我想使用jsonp处理

  • 我想用Netty ChannelHandler用Gzip压缩和解压,我试了一段时间,但总是有点困难。我的代码如下: 有什么问题吗?

  • 问题内容: 如何创建使芹菜任务看起来像的包装器?还是有更好的方法与Celery集成? Celery的创建者@asksol这样说: 将Celery用作异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。 但是我找不到任何专门针对框架的代码示例。 问题答案: 如官方网站上所述,这可以通过Celery 5.0版实现: htt