当前位置: 首页 > 知识库问答 >
问题:

我可以使用spring WebFlux来实现通过Kafka请求/响应主题获取数据的REST服务吗?

周瀚
2023-03-14

我正在开发REST服务,反过来,它将查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还期望有大量的负载,所以我考虑了异步/非阻塞方法,以避免数百个“servlet”线程在调用慢速系统时被阻塞。

正如我所看到的,这可以使用AsyncContext实现,它出现在新的servlet API规范中。我甚至开发了小的原型,它似乎是工作的。

另一方面,它看起来像我可以实现同样的使用spring WebFlux。不幸的是,我没有找到自定义“后端”调用用Mono/Flux包装的任何示例。大多数示例只是重用已经准备好的反应连接器,如ReactiveCassandraOperations.java等。

我的数据流如下:

JS客户端-->spring RestController-->向Kafka主题发送请求-->从Kafka回复主题读取响应-->向客户端返回数据

我能把Kafka步骤包装成Mono/Flux吗?如何做到这一点?我的RestController方法应该是什么样子?

下面是我的简单实现,它使用Servlet3.1 API实现了同样的功能

//took the idea from some Jetty examples
public class AsyncRestServlet extends HttpServlet {
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

    String result = (String) req.getAttribute(RESULTS_ATTR);

    if (result == null) { //data not ready yet: schedule async processing
        final AsyncContext async = req.startAsync();

        //generate some unique request ID
        String uid = "req-" + String.valueOf(req.hashCode()); 

        //share it to Kafka receive together with AsyncContext
        //when Kafka receiver will get the response it will put it in Servlet request attribute and call async.dispatch()
        //This doGet() method will be called again and it will send the response to client
        receiver.rememberKey(uid, async); 

        //send request to Kafka
        sender.send(uid, param); 

        //data is not ready yet so we are releasing Servlet thread
        return;
    }

    //return result as html response
    resp.setContentType("text/html");
    PrintWriter out = resp.getWriter();
    out.println(result);
    out.close();
}

共有1个答案

韩博厚
2023-03-14

这里有一个简短的示例--不是您可能想到的WebFlux客户机,但至少它可以使您利用Flux和Mono进行异步处理,我认为这是您问题的重点。web对象应该在不进行额外配置的情况下工作,但当然您需要配置Kafka,因为KafkaTemplate对象不能单独工作。

    @Bean // Using org.springframework.web.reactive.function.server.RouterFunction<ServerResponse>
    public RouterFunction<ServerResponse> sendMessageToTopic(KafkaController kafkaController){
        return RouterFunctions.route(RequestPredicates.POST("/endpoint"), kafkaController::sendMessage);
    }

    @Component
    public class ResponseHandler {
        public getServerResponse() {
            return ServerResponse.ok().body(Mono.just(Status.SUCCESS), String.class);
        }
    }

    @Component 
    public class KafkaController {
        public Mono<ServerResponse> auditInvalidTransaction(ServerRequest request) {
            return request.bodyToMono(TopicMsgMap.class) 
                // your HTTP call may not return immediately without this
                .subscribeOn(Schedulers.single()) // for a single worker thread
                .flatMap(topicMsgMap -> {
                    MyKafkaPublisher.sendMessages(topicMsgMap);
                }.flatMap(responseHandler::getServerResponse);
        }
    }

    @Data // model class just to easily convert the ServerRequest (from json, for ex.)
    // + ~@constructors
    public class TopicMsgMap() {
        private Map<String, String> topicMsgMap;
    }

    @Service // Using org.springframework.kafka.core.KafkaTemplate<String, String>
    public class MyKafkaPublisher {
        @Autowired
        private KafkaTemplate<String, String> template;

        @Value("${topic1}")
        private String topic1;
        @Value("${topic2}")
        private String topic2;

        public void sendMessages(Map<String, String> topicMsgMap){
            topicMsgMap.forEach((top, msg) -> {
                if (topic.equals("topic1") kafkaTemplate.send(topic1, message);
                if (topic.equals("topic2") kafkaTemplate.send(topic2, message);
            });
        }
    }

我猜这不是您想要的用例,但希望您发现这个通用结构有用。

 类似资料:
  • 我使用Java和Apache CXF为AngularJS(单页web站点)编写后端。在REST服务中,我需要访问http请求的头(即参数和cookies),也需要访问响应头(即参数和cookies)。其主要原因是安全性、身份验证目的和会话管理。这些都是重要的原因。

  • 本文向大家介绍通过jsonp获取json数据实现AJAX跨域请求,包括了通过jsonp获取json数据实现AJAX跨域请求的使用技巧和注意事项,需要的朋友参考一下 AJAX(异步的 JavaScript 和 XML)是用于创建快速动态网页的一种技术,它在不重新加载整个页面的情况下,与服务器交换数据并更新部分网页,ajax 使用XMLHttpRequest对象在后台与服务器交换数据,XMLHttpR

  • 我需要在get请求中以JSON格式获取数据,数据应为以下格式: 但数据以以下形式返回,响应为字符串: 代码: 如何使用Rest Assured在get请求中获取JSON格式?

  • 我想用retforIt从服务器得到响应。下面是一些代码: 怎么了?我在哪里可以找到改装文档?是什么?请帮忙

  • 问题内容: 我通过JavaScript收到了带有AJAX请求的JSON响应。 这是响应: 我的目标是获取内容: json变量是我的情况下的数据。因此,我尝试了: 但是我得到一个空字符串。 关于如何访问字符串的任何想法? 先感谢您。 问题答案: 您是否首先解析json? 比阅读

  • 但我在普罗米修斯控制台收到: level=warn ts=2019-08-06t08:25:36.643z caller=scrape.go:937 component=“scrape manager”scrape_pool=Prometheus target=http://localhost:4567/hello msg=“append failed”err=“\”invalid\“不是有效的启