我已经用RxJava成功地完成了一个小型Java程序。代码为:
public static void main( String[] args ) {
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
//multi-threading
ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1,80)
.groupBy(i -> batch.getAndIncrement() % threadCt )
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
).subscribe(System.out::println);
}
public static int intenseCalculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(500);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
使用此代码,一切正常。现在我正在尝试将此代码传递给Android:
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(0, copiedCategories.size() - 1)
.groupBy(i -> batch.getAndIncrement() % threadCt)
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
.subscribe(finishedListener::finished);
在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。
我在使用map(I)的线路上遇到错误-
no instance(s) of type variable(s) exist so that void conforms to R
内置。gradle(用于应用程序)我正在使用:
compile 'io.reactivex:rxjava:1.2.9'
我如何解决这个问题?
在java代码中,您有:
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
)
但在Android代码中,您正在主流上执行map()
:
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
所以,这些不是同一件事,Android代码中有额外的括号,这完全改变了流逻辑。
注意,您应该在UI线程上执行与UI相关的操作,因此您必须在订阅流之前执行observeOn(AndroidSchedulers.mainThread()),否则会导致崩溃。
我正在编写一个需要处理大量URL的java程序 每个URL将按顺序运行以下作业:下载、分析、压缩 我希望每个作业都有固定数量的线程,这样所有作业在任何给定时间都有并发运行的线程,而不是每个URL一次使用一个线程来完成所有作业。 例如,下载作业将有多个线程来获取和下载URL,一旦其中一个URL被下载,它就会将其传递给分析作业中的一个线程,一旦完成,它就会传递给压缩作业中的一个线程,等等。 我正在考虑
我使用一个Runnable对象来运行processCommand并执行一些需要一定时间的处理(我们称之为insideprocess)。在insideprocess的末尾,它将向文本文件写入一些内容。其思想是,如果在指定的时间内insideprocess仍未完成,则必须将其杀死,因此我使用ExecutorService来处理它。但是如果insideprocess早于指定时间完成,它将中断Execut
问题内容: 我有一个测试,可以从数据提供者那里接收数据。我希望此测试与数据提供者的不同值并行运行。 我尝试了类似的方法: } 我收到的输出是: 你好:10 你好:12 你好:17 你好:11 你好:16 你好:14 你好:13 你好:15 产生10个线程,而我在线程池大小中指定5个。您能否告诉我们必须在上述代码段中添加哪些内容来控制数据提供程序线程池的大小。 问题答案: 您需要使用。在和不需要的值
问题内容: 我有一堆JSON数组文件(准确地说是AVRO),每个文件都产生多个样本来训练Keras模型。通过使用@GPhilo和@jsimsa的想法,我能够想到这一点来并行化我的输入管道。无法弄清楚如何设计来划分处理文件的工作。代码内部失败,因为该函数需要一个字符串文件路径而不是一个, 这里正确的设计方法是什么 这是使用和设计输入管道的优化方法吗? 问题答案: 在我看来,发电机不必要地使您的生活变
当我使用spark API运行类似的代码时,它在许多不同的(分布式)作业中运行,并且成功运行。当我运行它时,我的代码(应该做与Spark代码相同的事情),我得到一个堆栈溢出错误。知道为什么吗? 代码如下: 我相信我正在使用与spark相同的所有并行化工作,但它对我不起作用。任何关于使我的代码分发/帮助了解为什么在我的代码中发生内存溢出的建议都将是非常有帮助的
问题内容: 我试图登录到http://www.magickartenmarkt.de网站,并在会员区域(https://www.magickartenmarkt.de/?mainPage=showWants)进行一些分析。我看到了其他示例,但是我不明白为什么我的方法行不通。我为第一种方法确定了正确的形式,但尚不清楚它是否有效。在第二种方法中,重播网页向我显示我无权访问会员区。 我很乐意提供任何帮助