当前位置: 首页 > 知识库问答 >
问题:

如何通过REST将传入数据转发到Quarkus中的SSE流

滑弘扬
2023-03-14

在我的设置中,我希望通过SSE通道(服务器发送的事件)转发某些状态更改。状态更改是通过调用RESTendpoint来启动的。因此,我需要将传入的状态更改转发到SSE流。

在Quarkus中实现这一点的最佳/最简单方法是什么。

我能想到的一个解决方案是使用EventBus(https://quarkus.io/guides/reactive-messaging)。SSEendpoint将订阅状态更改并通过SSE通道推送它。状态更改endpoint发布适当的事件。

这是可行的解决方案吗?是否有其他(更简单的)解决方案?我是否需要在任何情况下使用反应性的东西来实现这一点?

任何帮助都非常感谢!

共有2个答案

何兴邦
2023-03-14

Dmytro,谢谢你为我指明了正确的方向。我选择了与静态编程语言相关的Mutiny。我的代码现在如下所示:

data class DeviceStatus(var status: Status = Status.OFFLINE) {
    enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}

@ApplicationScoped
class DeviceStatusService {
    var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
    var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)

    fun pushDeviceStatus(deviceStatus: DeviceStatus) {
        deviceStatusProcessor.onNext(deviceStatus)
    }

    fun getStream(): Multi<DeviceStatus> {
        return Multi.createFrom().publisher(deviceStatusQueue)
    }
}

@Path("/deviceStatus")
class DeviceStatusResource {
    private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")

    @Inject
    @field: Default
    lateinit var deviceStatusService: DeviceStatusService

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    fun status(status: DeviceStatus): Response {
        LOGGER.info("POST /deviceStatus " + status.status);
        deviceStatusService.pushDeviceStatus(status)
        return Response.ok().build();
    }

    @GET
    @Path("/eventStream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    fun stream(): Multi<DeviceStatus>? {
        return deviceStatusService.getStream()
    }
}

作为最小设置,该服务可以直接使用deviceStatusProcess作为发布者。但是,Flowable增加了缓冲。欢迎对实现发表评论。

夏宏旷
2023-03-14

最简单的方法是使用rxjava作为流提供者。首先,您需要添加rxjava依赖项。它可以来自quarkus中的反应依赖项,如kafka,也可以直接使用它(如果您不需要任何流媒体库):

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.19</version>
        </dependency>

下面是如何每秒发送随机双精度值的示例:

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<Double> stream() {
        return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
    }

我们创建了新的Flowable,它将每秒发射一次,并在每次滴答声中生成下一个随机双精度。研究有关如何创建Flowable的任何其他选项,例如<代码>Flowable。fromFuture()根据您的特定代码逻辑进行调整。

P、 每次查询此endpoint时,上面的代码都将生成新的可流动的,我这样做是为了节省空间,在您的情况下,我假设您将有一个事件源,您可以构建一次,并在每次查询endpoint时使用相同的实例

 类似资料:
  • 我正在Quarkus中实现一个方法,它应该向客户端发送大量数据。使用JPA/Hibernate从数据库中读取数据,序列化为JSON,然后发送到客户端。如果没有整个数据在内存中,如何有效地完成此操作?我尝试了以下三种可能性,但都没有成功: 使用JPA中的getResultList,返回一个以列表为主体的响应。MessageBodyWriter将负责将列表序列化为JSON。然而,这会将所有数据拉入内存

  • 我有一个批处理作业,它将从web服务调用,输入作业参数作为任何select查询。spring批处理作业然后使用JdbcCursorItemReader运行该查询,然后使用FlatFileItemWriter将数据写入CSV。我的问题是我无法将列名从读取器中可用的元数据转移到编写器(相同的步骤)。 对如何做到这一点有什么建议吗?注意:我试图从writer获得构建和访问的头字符串,但bean初始化过程

  • 问题内容: REST服务需要根据JSON模式验证所有传入的JSON数据。json模式是公共可访问的,可以通过http请求进行检索。 我正在使用jackson-framwork在Java和json之间进行编组和拆组。到目前为止,我找不到使用杰克逊根据模式验证数据的任何可能性。 我还尝试了JsonTools框架,该框架显然带有这种验证功能。但是很不幸,我无法通过验证工作。 我该如何进行验证? 问题答案

  • 当正文有“\r\n”表示“回车”和“换行”时,如何以正确的wiki语法发布主题? 当我使用data=json.dumps(%topic_body%)时,它使我的所有文本都带有字面上的“\r\n”! 这很重要,我不能在markdown中发送body,因为我的合流不理解宏的这种方式: 因此,我需要以这种方式发送我的主题正文,其中可以包括新的line方法(https://confluence.atlas

  • 问题内容: 我想发送一个用JavaScript构造的数组,其中包含多个select的选定值。有没有一种方法可以使用ajax将此数组发送到php脚本? 问题答案: 您可以使用XML或JSON发回到服务器。您的javascript将必须构造该帖子,在XML的情况下,则需要您在javascript中创建它。JSON不仅轻巧,而且更易于在javascript中制作。签出JSON- PHP 来解析JSON。

  • 问题内容: 我正在通过Json向Web服务传递一些数据。我的问题是我正在传递html(来自tinyMCE输入),所以var的内容使用引号引起了问题。我正在传递这样的值: javascript中是否有espace引号,所以我可以在news_body var中发送html? 谢谢 问题答案: 而不是使用一次性代码,而是使用Javascript JSON编码器(例如由MooTools的JSON实用工具或