当前位置: 首页 > 知识库问答 >
问题:

用java-8流混合显式和隐式并行性

甄正信
2023-03-14

在过去,我写过一些java程序,使用两个线程。第一个线程(生产者)从API(C库)读取数据,创建一个java对象,将该对象发送到另一个线程。C API正在传递一个事件流(无限)。线程使用LinkedBlockingQueue作为管道交换对象(put、poll)。第二个线程(使用者)正在处理对象。(我还发现,线程中的代码更易读。第一个线程处理C API的内容并生成适当的java对象,第二个线程不受C API处理的影响,而是处理数据)。

现在我感兴趣的是如何使用Java8中的新流API实现上面的场景。但是假设我想保留两个线程(生产者/消费者)!第一个线程正在写入流。第二个线程正在从流中读取。我还希望,我可以使用这种技术处理更好的显式并行(生产者/消费者),并在流中使用一些隐式并行(例如,stream.parallel())。

我对新的流API没有太多的经验。因此,我尝试了下面的代码,以解决上面的想法。

    null
    null

再说一次:这个想法是与线程有一个显式的并行性。但在内部,我可以处理新特性,并在可能的情况下使用并行处理

也感谢你对这个问题的关心。

package sandbox.test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

public class MyStream {
    private volatile LongStream stream = null;
    private AtomicInteger producerCount = new AtomicInteger(0);
    private AtomicInteger consumerCount = new AtomicInteger(0);
    private AtomicInteger apiError = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    MyStream appl = new MyStream();
    appl.create();
    }

    private static void sleep(long sleep) {
    try {
        Thread.sleep(sleep);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    }

    private static void apiError(final String pos, final int iteration) {
    RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
    System.out.println(apiException.getMessage());
    throw apiException;
    }

    final private int simulateErrorAfter = 10;

    private Thread produce() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        System.out.println("Producer started");
        stream = LongStream.generate(() -> {
            int localCount;
            // Detect error, while using stream.parallel() processing
            int error = apiError.get();
            if ( error > 0 )
                apiError("1", error);
            // ----- Accessing the C API here -----
            localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
            // ----- Accessing the C API here -----

            // Checking error code from C API
            if ( localCount > simulateErrorAfter ) { // Simulate an API error
                producerCount.decrementAndGet();
                stream.close();
                apiError("2", apiError.incrementAndGet());
            }
            System.out.println("P: " + localCount);
            sleep(200L);
            return localCount;
            });
        System.out.println("Producer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private Thread consume() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        try {
            stream.onClose(new Runnable() {
            @Override
            public void run() {
                System.out.println("Close detected");
            }
            }).parallel().forEach(l -> {
            sleep(1000);
            System.out.println("C: " + l);
            consumerCount.incrementAndGet();
            });
        } catch (Exception e) {
            // Capturing the stream end
            System.out.println(e);
        }
        System.out.println("Consumer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private void create() throws InterruptedException {
    Thread producer = produce();
    while ( stream == null )
        sleep(10);
    Thread consumer = consume();
    producer.join();
    consumer.join();
    System.out.println("Produced: " + producerCount);
    System.out.println("Consumed: " + consumerCount);

    }
}

共有1个答案

姬天逸
2023-03-14

您需要了解API的一些基本要点:

>

  • 在流上应用的所有操作都是惰性的,在应用终端操作之前不会执行任何操作。使用“生产者”线程创建流是没有意义的,因为这个线程不会做任何事情。所有操作都在您的“使用者”线程和由实现本身启动的后台线程中执行。创建实例的线程完全不相关

    关闭流与stream操作本身无关,即不会关闭线程。它意味着释放额外的资源,例如关闭与files.lines(…)返回的流相关联的文件。您可以使用onclose调度这样的清理操作,当您调用close时,将调用这些操作,但仅此而已。对于Stream类本身来说,它没有任何意义。

  •  类似资料:
    • 问题内容: 我在使用Hibernate生成无效SQL时遇到问题。具体来说,混合和匹配隐式和显式联接。这似乎是一个开放的错误。 但是,我不确定 为什么 这是无效的SQL。我想出了一个生成相同语法异常的小型玩具示例。 架构图 数据 工作SQL 这两个查询均有效。我意识到有笛卡尔积;那是故意的。 明确加入: 隐式加入: 无效的SQL 此查询不适用于MSSQL 2000/2008或MySQL: 在MS20

    • 为了编写所需的最少代码量,我尝试让ModelMapper生成其隐式映射,并且只为那些无法自动映射的属性编写显式属性映射。 如果我让ModelMapper使用以下命令生成隐式映射: 它抱怨具有多个可能的映射。然后,我尝试使用以下方法修复该问题: 然而,我发现ModelMapper仍在抱怨,因为实际上在上抛出了一个异常,所以它没有机会访问我的自定义映射代码。 如果我反转这两个语句,我会得到一个错误:

    • 我有两个地图 和 类型为

    • 问题内容: 我试图弄清楚如何在Java 8并行流中复制ThreadLocal值。 因此,如果我们考虑到这一点: 哪个输出 有没有办法将ThreadLocal从main()方法克隆到为每次并行执行生成的线程中? 这样,我的结果是: 而不是第一个? 问题答案: 正如Louis在评论中所述,您的示例可以简化为捕获lambda表达式中的局部变量的值 从您的示例中看不出完整的用例是什么。 如果您确切知道将从

    • 问题内容: 我正在阅读SeleniumHQ文档,并遇到以下声明。 “警告:不要混合使用隐式等待和显式等待。这样做可能会导致不可预知的等待时间。例如,将隐式等待设置为10s,将显式等待设置为15秒,则可能导致20秒后发生超时。” 由于某些原因,我无法理解这一点。对于我来说,总超时时间是20秒。谁能解释我是否缺少什么? 编辑 我的问题不是关于混合这些等待的实现/后果。 这完全与文档中的语句和超时计算有

    • 问题内容: 我在数组列表中有很多Slot类型的对象。 插槽类别如下所示- 让类型列表被调用。插槽根据开始时间进行排序。一个时隙的结束时间可以等于下一时隙的开始时间,但是它们永远不会重叠。 是否有任何可能的方式可以使用Java 8流在此列表上进行迭代,并且如果一个的结束时间与下一个的开始时间匹配,则将两个插槽合并并输出到一个插槽中? 问题答案: 我的免费StreamEx库完全支持这种情况,该库增强了