当前位置: 首页 > 知识库问答 >
问题:

Java 8泛型:将消费者流减少为单个消费者

微生俊捷
2023-03-14

如何使用消费者编写一种方法,将消费者的组合成单个消费者。然后(消费者)

我的第一个版本是:

<T> Consumer<T> combine(Stream<Consumer<T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

<T> Consumer<T> noOpConsumer() {
    return value -> { /* do nothing */ };
}

这个版本使用JavaC和Eclipse编译。但是它太具体了:Stream不能是Stream

Stream<? extends Consumer<? super Foo>> consumers = ... ;
combine(consumers);

这是不可能编译的。改进后的版本将是:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

但是Eclipse和JavaC都不会编译:
Eclipse(4.7.3a):

类型消费者没有定义,然后(捕获#7-of?扩展消费者)

JavaC(1.8.0172):

错误:不兼容类型:无效的方法引用
。减少(消费者::andSo)
不兼容类型:消费者

但它应该是有效的:Consumer的每个子类也可以用作Consumer。超级X的每个消费者也可以消费X。我试图向流版本的每一行添加类型参数,但这没有帮助。但如果我用传统的循环写下来,它会编译:

<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) {
    Consumer<T> result = noOpConsumer()
    for (Consumer<? super T> consumer : consumers) {
        result = result.andThen(consumer);
    }
    return result;
}

(为了简洁起见,过滤掉空值。)

因此,我的问题是:如何让JavaC和Eclipse相信我的代码是正确的?或者,如果它不正确:为什么循环版本正确,而流版本不正确?


共有3个答案

蒋岳
2023-03-14

如果您有很多消费者,应用Consumer.and然后()将创建一个巨大的消费者包装树,该树被递归处理以调用每个原始消费者。

因此,简单地建立一个消费者列表,并创建一个简单的消费者来对其进行迭代,可能会更有效:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    List<Consumer<? super T>> consumerList = consumers
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    return t -> consumerList.forEach(c -> c.accept(t));
}

或者,如果您可以保证生成的使用者只被调用一次,并且在那时仍然有效,那么您可以直接在流上迭代:

return t -> consumers
        .filter(Objects::nonNull)
        .forEach(c -> c.accept(t));
何超英
2023-03-14

您在方法定义中忘记了一件小事。它目前是:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {}

但你是在报复消费者

<T> Consumer<? super T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
        .filter(Objects::nonNull)
        .map(c -> (Consumer<? super T>) c)
        .reduce(Consumer::andThen)
        .orElse(noOpConsumer());
}

现在应该可以了

农诚
2023-03-14

使用一个参数流。减少(累加器)具有以下签名的版本:

Optional<T> reduce(BinaryOperator<T> accumulator);

二进制运算符

<? extends Consumer<? super T>>

我建议您使用一个三参数版本的Stream.reduce(...)方法来代替:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator
             BinaryOperator<U> combiner);

双功能

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers.filter(Objects::nonNull)
                    .reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}

第三个参数二进制运算符

此外,为了更好地理解,可以将上述代码表示为:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {

    Consumer<T> identity = t -> {};
    BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
    BinaryOperator<Consumer<T>> combiner = Consumer::andThen;

    return consumers.filter(Objects::nonNull)
                    .reduce(identity, acc, combiner);
}

现在你可以写:

Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);

 类似资料:
  • 我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认