当前位置: 首页 > 面试题库 >

如何从Http集成流程创建Spring Reactor Flux?

韦晟睿
2023-03-14
问题内容

区别在于消息来自Http端点而不是JMS队列。问题是由于某些原因而无法填充消息通道,或者Flux.from()不会拾取它。日志条目显示GenericMessage是从Http
Integration流中创建的,并带有有效负载作为路径变量,但是没有入队/未发布到通道?我尝试过.channel(MessageChannels.queue())并且.channel(MessageChannels.publishSubscribe())
没有任何区别,事件流为空。这是代码:

@Bean
public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/eventmessage/{id}")
                        .requestMapping(r -> r
                        .methods(HttpMethod.POST)                                                                                   
                        )
                        .payloadExpression("#pathVariables.id")
                        )                           
                        .channel(MessageChannels.queue())
                        .log(LoggingHandler.Level.DEBUG)                            
                        .log()  
                        .toReactivePublisher();
    }


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              
            .map(Message::getPayload);

}

UPDATE1:

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-freemarker')
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

}

更新2

@SpringBootApplication@RestController在一个文件中定义时,它起作用,但是当@SpringBootApplication@RestController在单独的文件中时,它停止工作。

TestApp.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);
    }
}

TestController.java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
     @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }

}

问题答案:

这对我很好:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/message/{id}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .payloadExpression("#pathVariables.id")
                )
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> eventMessages() {
        return Flux.from(httpReactiveSource())
                .map(Message::getPayload);
    }

}

我在POM中有以下依赖关系:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.BUILD-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

我运行该应用程序,并有两个终端:

curl http://localhost:8080/events

听上交所。

在第二个中,我执行以下操作:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

因此,第一个终端的响应如下:

data:foo

data:bar

data:666

注意,我们不需要spring-boot-starter-webflux依赖。将Flux定期对MVC的Servlet容器上证所效果很好。

Spring
Integration也将很快支持WebFlux:https
://jira.spring.io/browse/INT-4300 。因此,您将可以在此处进行以下配置

   IntegrationFlows
    .from(Http.inboundReactiveGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且仅完全依赖WebFlux,而没有任何Servlet容器依赖性。



 类似资料:
  • 问题内容: 目前,每当我需要从数组创建流时,我都会 有一些直接的方法可以从数组创建流吗? 问题答案: 您可以使用Arrays.stream Eg 您也可以使用@fge所提到的,它看起来像 但是note 将返回,而如果您传递一个type数组,则将返回。因此,简而言之,您可以观察两种方法之间的区别,例如 将原始数组传递给时,将调用以下代码 当您将原始数组传递给以下代码时,将被调用 因此,您得到不同的结

  • 目前,每当我需要从数组创建流时,我都会

  • 问题内容: 我想在我的应用程序中创建一个流程。但是,从Java的API环顾四周之后,我还是不太了解它。 基本上,我想创建一个多进程应用程序。但是新过程是我的应用程序中的一类。 我知道有些人可能会问为什么不创建线程?因为该类正在调用Matlab代码,所以问题出在这里,而Java类在这里 有什么办法吗? 问题答案: 只有一种方法可以用Java创建进程,基本上,它允许您像通过命令行界面一样启动新的JVM

  • 我需要遍历一个形状像树的API。例如,目录结构或讨论线程。它可以通过以下流程进行建模: 如何遍历这些数据?我的工作如下: 然而,由于我使用的是带有缓冲区的流,所以流永远不会完成。 上游完成且缓冲元件已排空时完成 流缓冲器 我多次阅读了图表周期、活跃度和死锁部分,但仍在努力寻找答案。 这将创建一个活动锁: 编辑:我添加了一个git repo来测试你的解决方案https://github.com/Ma

  • 我想创建一个异步读取kafka消息的流,并使用队列通道累积大量要处理的消息,并且只有在处理完这些消息(例如50条消息)后,它才能处理另外50条消息,或者在释放队列中的空间时。我尝试使用一个从kafka委托读取到另一个流的流,该流具有一个带有PollerMetadata(Pollers.fixedDelay(500))的QueueChannel。maxMessagesPerPoll(50))但是轮询

  • 我的流在数据库中配置,我的程序不断创建和销毁流。 因此,流配置(例如cron配置)可以随时更改。 这些流是用方法IntegrationFlowContext注册的。使用IntegrationFlowRegistration方法注册并销毁。销毁。 流的运行从第0秒开始,可以在任何一分钟开始。销毁和创建新流从每分钟1秒开始。 这是一个好方法吗?当我测试这个时,它起作用了。但我在想,这是一种很好的方法吗