当前位置: 首页 > 知识库问答 >
问题:

创建Flux.fromIterable后如何运行on关闭操作?

逄宁
2023-03-14

假设我们需要基于可关闭资源的内容创建通量。为了清楚起见,假设有一个BufferedReader要转换为通量

BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));

让我们假设iteratorOfLines生成一组有限的项。

我正在寻找一种关闭BufferedReader的方法,当Flux已消耗其中的所有数据或由于某种原因不需要剩余数据时(即订阅中止)。

有一个构造函数reactor.core.publisher.FLuxIterable(Iterable iterable, Runnable onClole),但是:

  1. 似乎无法从Reactor的公共API获取(甚至是传递性的)

Flux.fromIterable发布最后一项后清理/关闭资源的正确方法是什么?

可能有一种比fromIterable更好的方法来做类似的事情,所以欢迎所有选项。

共有1个答案

龙高超
2023-03-14

对于使用资源的等效方法,您可以使用

    Flux.using(
            //Set up resource
            () -> createReader("my_resource_path"),
            //Create flux from resource
            reader -> Flux.fromIterable(iteratorOfLines(reader)),
            //Perform action (cleanup/close) 
            //when resource completes/errors/cancelled
            reader -> {
                try{
                    reader.close();
                }catch(IOException e){
                    throw Exceptions.propagate(e);
                }
            }
    );
 类似资料:
  • 我们有两个应用程序,一个是用Tkinter接口开发的,另一个是用wxPython构建的。两者都相当复杂。运行完Tkinter应用程序后,我希望在Tkinter应用程序中选择一个按钮后运行wxPython应用程序。是否可以切换事件循环,以便Tkinter应用程序可以无缝切换到wxPython GUI? 而在:操作系统。系统('pythonwxgui.py') 最终的程序需要捆绑到一个用于多个操作系统

  • 问题内容: 我正在尝试执行getPendingSalesOrderIDs()方法,该方法调用方法selectInAsending(…)。 但这显示了一个SQLException,它说java.sql.SQLException:在ResultSet关闭后不允许进行该操作 此处db.endSelect()将关闭所有连接。 我认为问题就在于此。 问题答案: 如果关闭ResultSet,为什么不删除它(在

  • 我尝试使用Flyway6.0.beta2BaseJavaMigration和Mysql 出于某种原因,我得到了一个错误 无法回滚事务java.sql.sqlnontransientConnectionException:连接关闭后不允许任何操作。 原因是什么

  • 我使用ActiveMQ使用mvn构建运行集成测试。这是我的pom。xml首先触发activemq,然后触发集成测试,以便它们可以使用上面的activemq实例传输消息。 它工作正常,但不能顺利关闭..当mvn构建结束并且所有测试都成功时,构建看起来很好。但是activemq在关闭时会显示以下错误:- 有没有一种方法可以让Maven在maven构建完成时顺利关闭activeMQ而没有上述例外?以下是

  • 屏幕关闭时无法更新位置。 屏幕关闭后如何运行位置跟踪服务? 开启服务 在onLocationChanged方法中,我尝试Log. e(),但在屏幕关闭时不在logcat中显示纬度和经度 屏幕关闭时帮我介绍一下服务管理。谢谢你。

  • 目的是从命令行执行gatling perf测试。等效docker命令是 现在,为了映射上面使用kubectl在库伯内特斯运行的docker,我创建了一个pod,下面是gradlewcommand.yaml文件 现在,使用以下命令创建容器:- 现在是我的实际要求或问题,我如何运行或触发kubectl run命令,以便在上面创建的pod中运行容器?,请注意,您的 pod 名称是 gradlecomma