在此博客中,他为回调地狱提供了此示例(复制/粘贴以下代码)。但是,没有提到如何通过使用Reactive
Extensions来解决此问题。
因此,此处F3取决于F1完成,而F4和F5取决于F2完成。
注意: 我目前正在尝试绕过Rx,所以在提出这个问题之前,我没有尝试解决此示例。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
/* the following are used to synchronize and compose the asynchronous callbacks */
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<String> f3Value = new AtomicReference<String>();
final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
try {
// get f3 with dependent result from f1
executor.execute(new CallToRemoteServiceA(new Callback<String>() {
@Override
public void call(String f1) {
executor.execute(new CallToRemoteServiceC(new Callback<String>() {
@Override
public void call(String f3) {
// we have f1 and f3 now need to compose with others
System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
// set to thread-safe variable accessible by external scope
f3Value.set(f3);
latch.countDown();
}
}, f1));
}
}));
// get f4/f5 after dependency f2 completes
executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
@Override
public void call(Integer f2) {
executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
@Override
public void call(Integer f4) {
// we have f2 and f4 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
// set to thread-safe variable accessible by external scope
f4Value.set(f4);
latch.countDown();
}
}, f2));
executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
@Override
public void call(Integer f5) {
// we have f2 and f5 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
// set to thread-safe variable accessible by external scope
f5Value.set(f5);
latch.countDown();
}
}, f2));
}
}));
/* we must wait for all callbacks to complete */
latch.await();
System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
} finally {
executor.shutdownNow();
}
}
public static void main(String[] args) {
try {
run();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Runnable {
private final Callback<String> callback;
private CallToRemoteServiceA(Callback<String> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseA");
}
}
private static final class CallToRemoteServiceB implements Runnable {
private final Callback<Integer> callback;
private CallToRemoteServiceB(Callback<Integer> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(100);
}
}
private static final class CallToRemoteServiceC implements Runnable {
private final Callback<String> callback;
private final String dependencyFromA;
private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
this.callback = callback;
this.dependencyFromA = dependencyFromA;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseB_" + dependencyFromA);
}
}
private static final class CallToRemoteServiceD implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(140);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(40 + dependencyFromB);
}
}
private static final class CallToRemoteServiceE implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(55);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(5000 + dependencyFromB);
}
}
private static interface Callback<T> {
public void call(T value);
}
}
我是所引用的有关回调和Java Futures的博客文章的原始作者。这是使用flatMap,zip和merge异步进行服务组合的示例。
它获取一个User对象,然后同时获取Social和PersonalizedCatalog数据,然后针对PersonalizedCatalog中的每个视频同时获取一个Bookmark,Rating和Metadata,将它们压缩在一起,并将所有响应合并为一个渐进流,作为服务器发送的事件输出。
。
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
可以在https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-
lab-
gateway/src/main/java/io/reactivex/lab/gateway/routes/
的功能正常的应用程序上下文中查看此示例RouteForDeviceHome.java#L33
由于我无法在此处提供所有信息,因此您也可以在演示文稿形式中找到解释(带有视频链接),网址为https://speakerdeck.com/benjchristensen/reactive-
streams-with-rx-at-javaone- 2014?slide =
32。
在这个博客中,他给出了这个(复制/粘贴以下代码)回调地狱的例子。但是,没有提到如何使用反应式扩展来消除问题。 所以这里F3取决于F1的完成,而F4和F5取决于F2的完成。 想知道Rx中的功能等效物是什么 如何在Rx中表示F1、F2、F3、F4和F5都应异步拉取 注意:我目前正在尝试将我的头围绕Rx,所以在问这个问题之前我没有尝试解决这个例子。
我正在获取一个网页,我需要得到所有元素的列表与给定的类名。问题是,我的数组还包含嵌套元素(从父元素获取),这些元素也具有相同的类名。 示例: 在这种情况下,我将接收5个元素,而不是4个-我想避免得到嵌套的元素。有什么解决方案可以达到我的目的吗?
问题内容: 避免嵌套查询有多重要。 我一直学会避免像瘟疫一样躲避它们。但是对我来说,它们是最自然的事情。在设计查询时,我首先写的是嵌套查询。然后,我将其转换为联接,有时需要花费很多时间才能正确。而且很少会带来很大的性能提升(有时确实会提高) 他们真的那么糟糕吗?有没有一种方法可以使用没有临时表和文件排序的嵌套查询 问题答案: 这确实取决于我遇到的情况,我可以通过使用子查询来改进一些查询。 我知道的
在Java,有没有一种方法可以避免在调用的每个级别上嵌套null检查,以确保沿途没有阻止下一次调用的null。有没有一个优雅的方法来做这件事? 例如: Objone.ObjTwo.ObjTree.ObjFour.ObjF
问题内容: 我需要在功能中锁定两个对象,当前代码看起来像这样; 如您所见,如果另一个线程使用obj1和两个相反的代码运行这段代码,则这是死锁的简单明了的配方。 有没有一种方法可以使用concurrency-utils锁来避免这种情况? 我当时正在考虑维护一个对象及其锁的地图,并验证它们是否可供使用,但是似乎无法提出一种可以预测锁顺序的干净方法。 问题答案: 尽管您保留了锁定顺序,但是如果将obj1
问题内容: 我需要确保许多并发用户能够访问数据库。虽然在每次提交后我都关闭了会话,但是有时我的代码遇到以下错误,但是当我几次执行相同的操作时,它会超过错误并可以正常工作。 我的hibernate状态是4.2.1。 我的密码 HibernateUtil 组态 问题答案: 在您的“我的代码”代码段中,可能存在一些问题: 发生异常时,没有阻止关闭会话的块 您正在打电话,但这与有所不同。因此,不会清除。