当前位置: 首页 > 知识库问答 >
问题:

Spring Integration从REST服务获取分页结果

阎坚成
2023-03-14

我正在研究与REST服务的集成,其思想是由出站网关MarketingCategoryOutboundGateway轮询,该网关HttpRequestExecutingMessageHandler实现。网关向REST服务发出请求,并将其响应推送到MarketingCategory通道。网关本身由MarketingCategoryPollerMessageSource使用MaketRiggeringMessage工厂方法创建的消息触发。

问题是服务返回分页结果。除了我已经拥有的服务激活器之外,我还会监听MarketingCategory通道,检查响应并将带有由MaketriggeringMessage创建的递增页码的新消息推送到MarketingCategoryPoller通道,这样代码就会在循环中旋转,直到从REST服务获取所有页面。

Spring Integration是否允许制作这样的过滤器,在输入通道上接收一条消息,根据一个条件对其进行测试,并在条件为真的情况下将一条新消息推送到输出通道?

代码:

//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}

//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}

//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}

//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
    //make a message for triggering marketingCategoryOutboundGateway
    return MessageBuilder.withPayload("")
            .setHeader("Host", "eclinic")
            .setHeader("page", page)
            .build();
}

//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    //make a request to the REST service and push the response to the marketingCategory channel
}

//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
    return (msg) -> {
        //process the categories returned by marketingCategoryOutboundGateway
    };
}

共有1个答案

汪阳辉
2023-03-14

我已经找到了一个基于这篇文章的解决方案,从带有spring集成的分页REST服务中阅读和下载:

>

  • 触发与REST服务对话的出站网关,并使用带轮询器的入站通道适配器将响应推送到通道。入站通道适配器是一个消息源,它最初生成一个消息,消息头指示要从REST API获取的页码。出站网关使用页面消息头生成指定所需页面的url

    出站网关向其推送REST服务响应的信道有2个订户:

    2.2.筛选器,检查这是否是最后一页,如果不是,则将消息进一步发送到header enricher使用的另一个通道

    接收到消息后,报头充实器增加其页头并将该消息进一步推送到触发出站网关的通道,网关读取增加的页头并从REST服务获取下一页

    循环一直旋转,直到REST服务返回最后一页。过滤器不让此消息传递到header,从而打破了循环。

    完整代码:

    @Configuration
    public class IntegrationConfiguration {
    
        private final ApiGateConfig apiGateConfig;
    
        IntegrationConfiguration(ApiGateConfig apiGateConfig) {
            this.apiGateConfig = apiGateConfig;
        }
    
        @Bean("marketingCategory")
        MessageChannel marketingCategory() {
            return new PublishSubscribeChannel();
        }
    
        @Bean
        MessageChannel marketingCategoryPoller() {
            return new DirectChannel();
        }
    
        @Bean
        MessageChannel marketingCategoryPollerNextPage() {
            return new DirectChannel();
        }
    
        @Bean
        @InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
        public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
            return () -> makeTriggeringMessage(0);
        }
    
        /**
         * Build a gateway triggering message
         */
        private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
            return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
                    .setHeader("Host", "eclinic")
                    .setHeader("page", page)
                    .build();
        }
    
        @Bean
        @ServiceActivator(inputChannel = "marketingCategoryPoller")
        public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    
            String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";
    
            //the type of the payload
            ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
            };
    
            //page number comes from the message
            SpelExpressionParser expressionParser = new SpelExpressionParser();
            var uriVariables = new HashMap<String, Expression>();
            uriVariables.put("page", expressionParser.parseExpression("headers.page"));
    
            HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
            handler.setHttpMethod(HttpMethod.GET);
            handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
            handler.setOutputChannel(channel);
            handler.setUriVariableExpressions(uriVariables);
    
            return handler;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "marketingCategory")
        public MessageHandler marketingCategoryHandler() {
            return (msg) -> {
                var page = (RestPageImpl<MarketingCategory>) msg.getPayload();
    
                System.out.println("Page #" + page.getNumber());
    
                page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));
    
            };
        }
    
        @Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
        public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
            return !page.isLast();
        }
    
        @Bean
        @Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
        HeaderEnricher incrementPage() {
            Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
            Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");
    
            var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
            valueProcessor.setOverwrite(true);
    
            headersToAdd.put("page", valueProcessor);
            return new HeaderEnricher(headersToAdd);
        }
    }
    

  •  类似资料:
    • 我正在使用Get请求调用部署在本地Apache Tomcat服务器上的war文件中的Rest服务,URL“HTTP://localhost:8080/usermanagement/Rest/userservice/users/1”返回一个用户,但URL“HTTP://localhost:8080/usermanagement/Rest/userservice/users/1”返回HTTP状态404

    • 我试图使用Angular 2 HTTP从REST web服务获取数据。 我首先将服务注入调用它的客户端组件类的构造函数中: ps:jsonData是客户端组件类的字符串属性。

    • 例如,我有一个来自URL的分页响应https://swapi.dev/api/people . 此endpoint每页仅提供9个人。我想在Spring Boot应用程序中使用WebClient收集所有星球大战角色,但我不知道如何使用WebClient爬过页面并以非阻塞方式一次检索所有人。有人知道如何做到这一点吗?谢谢您的帮助。

    • 我正在使用jsoup库,今天我遇到了一个问题。我必须刮取DuckDuckGo并为每个页面获取查询结果的所有标题,但使用 我只得到关于第一页的结果。我怎样才能继续下一页?

    • 使用邮递员,我可以毫无问题地获得令牌。如果单击和,我会得到以下代码片段: 当我尝试在Rest Assured中对请求建模时,我得到了一个400错误,但不清楚为什么: 我哪里出了问题很明显吗?是否应该将键/值传递到中?

    • 大家好,我有一个spring boot rest api,每个键和值都有重复的结果,如下代码所示 正如您所看到的,每一列都是重复的,一列首字母大写,另一列全是小写 这是我的课: 我使用我的数据库中的postgres,这是我的代码 如有任何帮助或指导将不胜感激