在过去,我写过一些java程序,使用两个线程。第一个线程(生产者)从API(C库)读取数据,创建一个java对象,将该对象发送到另一个线程。C API正在传递一个事件流(无限)。线程使用LinkedBlockingQueue作为管道交换对象(put、poll)。第二个线程(使用者)正在处理对象。(我还发现,线程中的代码更易读。第一个线程处理C API的内容并生成适当的java对象,第二个线程不受C API处理的影响,而是处理数据)。
现在我感兴趣的是如何使用Java8中的新流API实现上面的场景。但是假设我想保留两个线程(生产者/消费者)!第一个线程正在写入流。第二个线程正在从流中读取。我还希望,我可以使用这种技术处理更好的显式并行(生产者/消费者),并在流中使用一些隐式并行(例如,stream.parallel())。
我对新的流API没有太多的经验。因此,我尝试了下面的代码,以解决上面的想法。
再说一次:这个想法是与线程有一个显式的并行性。但在内部,我可以处理新特性,并在可能的情况下使用并行处理
也感谢你对这个问题的关心。
package sandbox.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
public class MyStream {
private volatile LongStream stream = null;
private AtomicInteger producerCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger apiError = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyStream appl = new MyStream();
appl.create();
}
private static void sleep(long sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void apiError(final String pos, final int iteration) {
RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
System.out.println(apiException.getMessage());
throw apiException;
}
final private int simulateErrorAfter = 10;
private Thread produce() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Producer started");
stream = LongStream.generate(() -> {
int localCount;
// Detect error, while using stream.parallel() processing
int error = apiError.get();
if ( error > 0 )
apiError("1", error);
// ----- Accessing the C API here -----
localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
// ----- Accessing the C API here -----
// Checking error code from C API
if ( localCount > simulateErrorAfter ) { // Simulate an API error
producerCount.decrementAndGet();
stream.close();
apiError("2", apiError.incrementAndGet());
}
System.out.println("P: " + localCount);
sleep(200L);
return localCount;
});
System.out.println("Producer terminated");
}
});
thread.start();
return thread;
}
private Thread consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
stream.onClose(new Runnable() {
@Override
public void run() {
System.out.println("Close detected");
}
}).parallel().forEach(l -> {
sleep(1000);
System.out.println("C: " + l);
consumerCount.incrementAndGet();
});
} catch (Exception e) {
// Capturing the stream end
System.out.println(e);
}
System.out.println("Consumer terminated");
}
});
thread.start();
return thread;
}
private void create() throws InterruptedException {
Thread producer = produce();
while ( stream == null )
sleep(10);
Thread consumer = consume();
producer.join();
consumer.join();
System.out.println("Produced: " + producerCount);
System.out.println("Consumed: " + consumerCount);
}
}
您需要了解流
API的一些基本要点:
>
在流上应用的所有操作都是惰性的,在应用终端操作之前不会执行任何操作。使用“生产者”线程创建流是没有意义的,因为这个线程不会做任何事情。所有操作都在您的“使用者”线程和由流
实现本身启动的后台线程中执行。创建流
实例的线程完全不相关
关闭流与stream
操作本身无关,即不会关闭线程。它意味着释放额外的资源,例如关闭与files.lines(…)
返回的流相关联的文件。您可以使用onclose
调度这样的清理操作,当您调用close
时,流
将调用这些操作,但仅此而已。对于Stream
类本身来说,它没有任何意义。
问题内容: 我在使用Hibernate生成无效SQL时遇到问题。具体来说,混合和匹配隐式和显式联接。这似乎是一个开放的错误。 但是,我不确定 为什么 这是无效的SQL。我想出了一个生成相同语法异常的小型玩具示例。 架构图 数据 工作SQL 这两个查询均有效。我意识到有笛卡尔积;那是故意的。 明确加入: 隐式加入: 无效的SQL 此查询不适用于MSSQL 2000/2008或MySQL: 在MS20
为了编写所需的最少代码量,我尝试让ModelMapper生成其隐式映射,并且只为那些无法自动映射的属性编写显式属性映射。 如果我让ModelMapper使用以下命令生成隐式映射: 它抱怨具有多个可能的映射。然后,我尝试使用以下方法修复该问题: 然而,我发现ModelMapper仍在抱怨,因为实际上在上抛出了一个异常,所以它没有机会访问我的自定义映射代码。 如果我反转这两个语句,我会得到一个错误:
我有两个地图 和 类型为
问题内容: 我试图弄清楚如何在Java 8并行流中复制ThreadLocal值。 因此,如果我们考虑到这一点: 哪个输出 有没有办法将ThreadLocal从main()方法克隆到为每次并行执行生成的线程中? 这样,我的结果是: 而不是第一个? 问题答案: 正如Louis在评论中所述,您的示例可以简化为捕获lambda表达式中的局部变量的值 从您的示例中看不出完整的用例是什么。 如果您确切知道将从
问题内容: 我正在阅读SeleniumHQ文档,并遇到以下声明。 “警告:不要混合使用隐式等待和显式等待。这样做可能会导致不可预知的等待时间。例如,将隐式等待设置为10s,将显式等待设置为15秒,则可能导致20秒后发生超时。” 由于某些原因,我无法理解这一点。对于我来说,总超时时间是20秒。谁能解释我是否缺少什么? 编辑 我的问题不是关于混合这些等待的实现/后果。 这完全与文档中的语句和超时计算有
问题内容: 我在数组列表中有很多Slot类型的对象。 插槽类别如下所示- 让类型列表被调用。插槽根据开始时间进行排序。一个时隙的结束时间可以等于下一时隙的开始时间,但是它们永远不会重叠。 是否有任何可能的方式可以使用Java 8流在此列表上进行迭代,并且如果一个的结束时间与下一个的开始时间匹配,则将两个插槽合并并输出到一个插槽中? 问题答案: 我的免费StreamEx库完全支持这种情况,该库增强了