当前位置: 首页 > 知识库问答 >
问题:

Spring启动Kafka请求-回复场景

吴胜
2023-03-14

我正在实现请求/应答场景的POC,以便使用Kafka移动基于事件的微服务堆栈。

Spring有两种选择。我不知道哪一个更好用<代码>回复Kafka模板或云流

第一个是回复KafkaTemplate,它可以很容易地配置为有专门的频道来回复每个实例的主题<代码>记录。标题()。添加(新记录头(KafkaHeaders.REPLY_主题,provider.getReplyChannelName()。getBytes()) 消费者不需要知道回复主题的名称,只需收听主题并返回给定的回复主题即可。

@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
       .....
}

第二种选择是结合使用StreamListenerspring集成IntegrationFlows。应配置网关并筛选回复主题。

@MessagingGateway
public interface StreamGateway {
    @Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
    String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() { 
    return IntegrationFlows.from(START)
            .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID)) 
            .channel(Channels.REQUEST)
            .get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(GatewayChannels.REPLY)
            .filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
            .channel(FILTER)
            .get();
}

建筑答复

@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) { 

必须指定回复渠道。所以收到的回复主题会根据instanceID进行过滤,这是一种解决方案(可能会使网络膨胀)。另一方面,DLQ场景是通过添加

              consumer:
                enableDlq: true

在与RabbitMQ和其他功能的互操作性方面,使用SpringCloudStreams看起来很有希望,但目前还没有正式支持请求-应答场景。问题仍然悬而未决,未被驳回。(https://github.com/spring-cloud/spring-cloud-stream/issues/1800)

欢迎提出任何建议。

共有1个答案

濮阳原
2023-03-14

Spring Cloud Stream不是为请求/回复而设计的;它可以完成,但不简单,你必须编写代码。

使用@KafkaListener框架可以为您处理所有事情。

如果您希望它也能与RabbitMQ一起使用,还可以使用@RabbitListener对其进行注释。

 类似资料:
  • 我有几个关于API-网关的问题和请求节流 我们在zuul/ribbon中是否有任何内置功能来限制请求?如果没有 spring是否提供了任何功能 我可以使用番石榴速率限制器来限制请求或桶模式

  • 在进行内容协商测试时,尽管响应状态为200,但mock GET在响应体中返回null。 这里是完整的测试类代码。我想验证内容类型是json。 这是我的控制器的endpoint。 有没有想过为什么身体会返回为空?

  • 我正在使用改进后的Spring Boot服务请求,但总是被称为失败回调方法。这是我的简化代码: Spring Boot服务(管理员): 我的客户端界面: 异步 POST 请求: 这是我的用户类(POJO): 注意:我在Android开发,当从邮递员手动发送请求时,我得到200 OK。此外,我在logcat中得到消息:400错误请求

  • 是否有一个请求-回复模式应该与spring-cloud-stream一起使用?我在spring-cloud-stream上能找到的所有留档都面向MessageChannel.send即发即弃类型的生产者,我熟悉spring-集成中的@MessagingGateway,但我不确定这将如何与spring-cloud-stream一起使用。当您有一个REST POSTendpoint来保存具有分配标识符

  • 我对骆驼很陌生,如果这很明显,请原谅。 我们正在尝试设置一条骆驼路线(在 talend esb 中),它执行以下操作: < li >通过JMS接收消息 < li >数据库更新 < li >通过JMS使用请求/回复将消息发送到另一个系统 < li >使用回复中的信息进行另一次数据库更新 这一切都在一条路线上。我发现该路由不再接受 1 中的任何消息。当它正在等待 3 中的回复时。 我曾尝试在JMS组件

  • 谢谢你调查我的问题。 我正在使用Spring Boot和Spring Security进行练习。我已经创建了一个简单的项目,带有基本的注册,但是我无法登录工作。我正在尝试手动将用户登录到一个方法中,但是该方法没有启动。当我尝试用POST: /login登录时,它只是将302重定向到。我非常确定我已经正确地设置了安全配置和方法注释。但是post方法甚至没有运行。(我知道,因为我在帖子方法中有一个打印