当前位置: 首页 > 知识库问答 >
问题:

在ApacheKafkaJava中实现请求-响应

柴嘉年
2023-03-14

请找到我们需要实现的用例。

首先,我们需要调用Kafka生产者将消息作为rest服务,他们将在另一个主题中处理并返回响应。

对于我们来说,这是一个请求-回复主题,我们需要回复相同的请求-响应,使用replykafka模板工作正常,但我们可以在标题中设置相关id。

作为主题消息元数据,存在发送属性,是否有任何方法将关联id映射到请求主题消息和回复主题消息。

给你好好解释一下。

一个微服务期望负载如下所示,负载中包含correlationId。

{
  "operationDate": "2020-09-16T11:58:25",
  "correlationId": "-5544538377183901824042719876882142227",
  "birthDate": "2013-12-12",
  "firstNameEn": "boby",
  "firstNameAr": "الشيخ",
}

微服务将处理有效负载,并在另一个主题中给出响应。

{
  "correlationId": -5544538377183901824042719876882142227,
  "consumerId": null,
  "userid": 123456,
  "statusCode": "SUCCESS",
  "errors": null
}

现在,我们需要实现使用Spring ReplyingKafkaTem板。

作为回复,KafkatTemplate将仅与标头中的correlationId一起使用

共有2个答案

朱承载
2023-03-14

谢谢你的提示。

我已经做了与Kafka头相关的有效载荷覆盖。

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

在Replay onMessage中,我已将响应负载correlationId填充到头中。

@Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(
            krConsumerRecord -> //update each record header
        );
        super.onMessage(data);
    }

通过这种方式,成功地将请求-响应语义与请求和响应负载中的correlationId集成在一起。

佟阳飙
2023-03-14

假设您的意思是希望在相关id中包含主题,请参阅

java prettyprint-override">/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {

您可以创建自己的相关ID,基于producerRecords(其中包含主题())。

你只需要确保它是独一无二的。如果您手动设置KafkaHeaders。REPLY_TOPIC,它将对策略可见。

编辑

使用有效负载中的相关id,使用setCorrelationIdStrategy从有效负载中提取相关id,并添加RecordInterceptor在应答端执行相同操作。

 类似资料:
  • 当两个客户端都登录到游戏中并准备就绪后,每个客户端依次做出一个“移动”,并将其发送给服务器授权。在服务器授权移动后,它会发送2条消息:1)第一条消息给做出“移动”的玩家,告诉他它的移动被批准,现在它应该“等待”另一个对手上场。2)第二条消息,给对手,告诉他什么士兵移动了,现在轮到他上场了。...bla bla bla bla 我的问题是:当每个客户端登录到游戏时,为他创建了一个新的套接字,用于与服

  • 在SpringMVC项目中,客户机发送一个带有序列化对象的请求,客户机本身是一个小程序,所以它不希望收到一个web页面作为响应,而是一个带有字符串对象的响应,该响应将告诉他这是成功还是失败。那么解决方案是什么呢?我想在@Controller中使用返回void的方法,或者返回不存在页面的方法?(在这两种情况下,我还想知道是否有回复给客户)

  • 介绍 YurunHttp 的请求响应类,结果类。除了遵循 PSR-7 规范,另外还增加了一些人性化的方法。 类:Yurun\Util\YurunHttp\Http\Response use Yurun\Util\HttpRequest; $http = new HttpRequest; $response = $http->get('http://www.baidu.com'); 方法 响应内

  • 问题内容: 我正在使用以下方法将图像从Android上传到服务器。 现在,我想实现一个ProgressDialog,当上传完成时应将其关闭。我不知道多部分请求何时结束。 谢谢 问题答案: 上这堂课: 然后在您的活动中: 现在,您将在成功上传/失败后触发适当的回调。 资源

  • 概述 Django 使用Request 对象和Response 对象在系统间传递状态。 当请求一个页面时,Django会建立一个包含请求元数据的 HttpRequest 对象。 当Django 加载对应的视图时,HttpRequest 对象将作为视图函数的第一个参数。每个视图会返回一个HttpResponse 对象。 本文档对HttpRequest 和HttpResponse 对象的API 进行说

  • 本文向大家介绍Java Web请求与响应实例详解,包括了Java Web请求与响应实例详解的使用技巧和注意事项,需要的朋友参考一下   Servlet最主要作用就是处理客户端请求并作出回应,为此,针对每次请求,Web容器在调用service()之前都会创建两个对象,分别是HttpServletRequest和HttpServletResponse。其中HttpServletRequest封装HTT