当前位置: 首页 > 知识库问答 >
问题:

以可关闭资源作为累加器的Java收集器

温星华
2023-03-14

假设我试图创建一个收集器,将数据聚合到一个资源中,该资源在使用后必须关闭。有没有办法实现类似于收集器中的finally块的东西?在成功的情况下,这可以在finisher方法中完成,但在异常情况下似乎没有调用任何方法。

目标是以干净的方式实现如下操作,而不必首先将流收集到内存列表中。

stream.collect(groupingBy(this::extractFileName, collectToFile()));

共有2个答案

翟昊明
2023-03-14

好的,我已经研究了收集器实现,您需要收集器mpl来创建自定义收集器,但它不是公共的。因此,我使用它的副本实现了新的一个(您可能会感兴趣的最后2种方法):

public class CollectorUtils<T, A, R> implements Collector<T, A, R> {

    static final Set<Collector.Characteristics> CH_ID = Collections
            .unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));

    private final Supplier<A> supplier;
    private final BiConsumer<A, T> accumulator;
    private final BinaryOperator<A> combiner;
    private final Function<A, R> finisher;
    private final Set<Characteristics> characteristics;

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Function<A, R> finisher, Set<Characteristics> characteristics) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.combiner = combiner;
        this.finisher = finisher;
        this.characteristics = characteristics;
    }

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Set<Characteristics> characteristics) {
        this(supplier, accumulator, combiner, castingIdentity(), characteristics);
    }

    @Override
    public BiConsumer<A, T> accumulator() {
        return accumulator;
    }

    @Override
    public Supplier<A> supplier() {
        return supplier;
    }

    @Override
    public BinaryOperator<A> combiner() {
        return combiner;
    }

    @Override
    public Function<A, R> finisher() {
        return finisher;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return characteristics;
    }

    @SuppressWarnings("unchecked")
    private static <I, R> Function<I, R> castingIdentity() {
        return i -> (R) i;
    }

    public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
        return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
            c.add(toFile(t));
        }, (r1, r2) -> {
            r1.addAll(r2);
            return r1;
        }, CH_ID);
    }

    private static File toFile(String fileName) {
        try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
            // stuff
            System.out.println("Converting " + fileName);

            return new File(fileName);
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        throw new RuntimeException("Failed to create file");

    }

}

然后我调用stream,如下所示:

public static void main(String[] args) {
        Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
    }

输出:

Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt
白智
2023-03-14

我认为您能够满足需求的唯一方法是通过向流提供一个紧密的处理程序。onClose方法。假设您有以下类:

class CloseHandler implements Runnable {
    List<Runnable> children = new ArrayList<>();

    void add(Runnable ch) { children.add(ch); }

    @Override
    public void run() { children.forEach(Runnable::run); }
}

现在,您需要按如下方式使用流:

CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
    // Now collect
    stream.collect(Collectors.groupingBy(
        this::extractFileName, 
        toFile(closeAll)));
}

这使用了try with resources构造,这样当使用或出现错误时,流就会自动关闭。请注意,我们正在将closeAllclose处理程序传递给流。onClose方法。

下面是下游收集器的草图,它将收集/写入/发送元素到Closeable资源(请注意,我们还将CloseAll关闭处理程序传递给它):

static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {

    class Acc {

        SomeResource resource; // this is your closeable resource

        Acc() {
            try {
                resource = new SomeResource(...); // create closeable resource
                closeAll.add(this::close);        // this::close is a Runnable
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        void add(Something elem) {
            try {
                // TODO write/send to closeable resource here
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        Acc merge(Acc another) {
            // TODO left as an exercise
        }

        // This is the close handler for this particular closeable resource
        private void close() {
            try {
                // Here we close our closeable resource
                if (resource != null) resource.close();
            } catch (IOException ignored) {
            }
        }
    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}

因此,它使用一个本地类(名为Acc)来包装可关闭的资源,并声明方法来向可关闭的资源中添加流的一个元素,以及合并两个Acc实例,以防流是并行的(留作练习,以防值得努力)。

收集器。of用于基于Accclass'方法创建收集器,带有返回null的finisher,因为我们不想在收集器创建的映射中放入任何内容。分组依据

最后,还有close方法,如果包装好的可关闭资源已被创建,它将关闭该资源。

当流通过try-with-资源构造隐式关闭时,将自动执行CloseHandler.run方法,这将反过来执行以前在创建每个Acc实例时添加的所有子关闭处理程序。

 类似资料:
  • 问题内容: 我不了解以下方法的第三个参数的实用程序: 从javaDoc: 产生的结果等于: 如您所见,该参数未使用。例如,以下代码将字符串累积到ArrayList中: 但我期望这样: 问题答案: 在使用时,你的是平行的,因为在这种情况下,多个线程收集的元素到最终输出的子列表,并且这些子列表必须被组合以产生最终的。

  • 问题内容: 我正在编写一个连接到网站并从中读取一行的应用程序。我这样做是这样的: 好吗?我的意思是,我在最后一行关闭了BufferedReader,但没有关闭InputStreamReader。我是否应该从connection.getInputStream创建一个独立的InputStreamReader,并从独立的InputStreamReader创建一个BufferedReader,而不是关闭所

  • 问题内容: 执行以下代码时出错, 原因:java.lang.IllegalAccessError:尝试从类访问com.google.common.collect.AbstractTable类 出现错误-使用Oracle 1.8 jre的HashBasedTable :: putAll 问题答案: 有趣的是,我用Lambda表达式替换了方法引用,并且它起作用了。

  • 我不明白以下方法的第三个参数的效用: 来自Javadoc: 如您所见,没有使用参数。例如,以下内容将把字符串累加到ArrayList中: 但我预料到了这一点:

  • 资源泄漏:“扫描”永远不会关闭。 因此,我在代码末尾添加了来处理警告。 出现这个问题是因为我在同一个包中有其他类也使用scanner对象,而Eclipse告诉我分别关闭这些类中的scanner。然而,当我这样做时,它似乎关闭了所有的扫描器对象,并在运行时得到错误。 我遇到的一篇文章提到,当关闭时,我不能重新打开。如果是这种情况,我是否只需要确保一个带有System.in的扫描器对象在程序末尾关闭,

  • Hadoop Apache Tez – 它是一个针对Hadoop数据处理应用程序的新分布式执行框架,该框架基于YARN; SpatialHadoop – SpatialHadoop是Apache Hadoop的MapReduce扩展,专门用于处理空间数据; GIS Tools for Hadoop –用于Hadoop框架的大数据空间分析; Elasticsearch Hadoop – Elasti