当前位置: 首页 > 知识库问答 >
问题:

重构旧版SOA系统以使用非阻塞异步微服务

卢雅惠
2023-03-14

我刚刚开始尝试了解RxJava,以便我可以使用项目反应器重构旧版SOA系统来使用非阻塞异步微服务。

目前,我正在做一项可行性研究,并考虑使用类似spoon的东西来转换遗留服务代码(但这与这个问题无关)

我想知道如何使用reactor bus Request/Reply语法来替换这个同步服务代码。或者即使我应该使用完全不同的Reactor结构。

这里是一个遗留soa服务的示例,它是人为设计的,因此可能没有完美的意义,但基本上每个服务都依赖于最后一个服务的结果。

 public static Map<String, Object> createAccount(DispatchContext dctx, Map<String, Object> context) {
    LocalDispatcher dispatcher = dctx.getDispatcher();

    String accountPartyId = (String) context.get("partyId");

    Map<String, Object> input = UtilMisc.toMap("groupName", context.get("accountName"), "groupNameLocal", context.get("groupNameLocal"), "officeSiteName", context.get("officeSiteName"), "description", context.get("description"), "partyId", accountPartyId);

    Map<String, Object> serviceResults1 = dispatcher.runSync("createPartyGroup", input);

    Map<String, Object> serviceResults2 = dispatcher.runSync("createPartyRole", UtilMisc.toMap("partyId", (String) serviceResults1.get("partyId"), "roleTypeId", "ACCOUNT"));

    String dataSourceId = (String) context.get("dataSourceId");
    Map<String, Object> serviceResults3 = null;
    if (dataSourceId != null) {
        serviceResults3 = dispatcher.runSync("crmsfa.addAccountDataSource", UtilMisc.toMap("partyId", (String) serviceResults2.get("partyId"), "dataSourceId", dataSourceId));
    }

    String marketingCampaignId = (String) context.get("marketingCampaignId");
    Map<String, Object> serviceResults4 = null;
    if (marketingCampaignId != null) {
        serviceResults4 = dispatcher.runSync("crmsfa.addAccountMarketingCampaign", UtilMisc.toMap("partyId", (String) serviceResults3.get("partyId"), "marketingCampaignId", marketingCampaignId));

    }

    String initialTeamPartyId = (String) context.get("initialTeamPartyId");
    Map<String, Object> serviceResults5 = null;
    if (initialTeamPartyId != null) {
        serviceResults5 = dispatcher.runSync("crmsfa.assignTeamToAccount", UtilMisc.toMap("accountPartyId", (String) serviceResults4.get("partyId"), "teamPartyId", initialTeamPartyId, "userLogin", userLogin));
    }

    Map<String, Object> results = ServiceUtil.returnSuccess();
    results.put("groupId", (String) serviceResults1.get("groupId"));
    results.put("roleId", (String) serviceResults2.get("roleId"));
    results.put("dataSourceId", (String) serviceResults3.get("dataSourceId"));
    results.put("marketingCampaignId", (String) serviceResults4.get("marketingCampaignId"));
    results.put("teamPartyId", (String) serviceResults5.get("teamPartyId"));
    return results;
}

基本上,这是一个使用dispatcher调用其他服务的服务。运行同步。。。我只是在寻找一个起点,研究如何可能使用reactor或其他库将这种语法转换为异步非阻塞代码。

在这一点上,我用非常模糊的术语来思考回调/某种promise类型的结构。

就像第一次呼叫另一个服务是

Map<String, Object> serviceResults = dispatcher.runSync("createPartyGroup", input);

如果它返回了一个包含serviceResults映射的Promise对象,那么该方法的其余部分可以被移动到Promise onComplete块中,结果将是构成该服务方法的一组深度嵌套的onComplete代码块。

Promise p = task {
    // createPartyGroup service call
}
p.onComplete { result ->

Promise p2 = task {
    // createPartyRole sevice call
}

p2.onComplete { result ->
//next service call
}   
}  
}

或者看看Reactor总线文档,比如下面的内容,在很多层面上都没有意义,我只是对Reactor的了解不够,不知道为什么它没有意义,或者我接下来要学习什么来理解为什么它没有意义

bus.send("service.createPartyGroup", Event.wrap(input, "reply.service.createPartyGroup")); 
bus.receive($("reply.service.createPartyGroup"), ev -> {
Map<?> input2 = UtilMisc.toMap("partyId", (String) ev.get("partyId"), "roleTypeId", "ACCOUNT")
  bus.send("service.createPartyRole", Event.wrap(input2, "reply.service.createPartyRole")); 
}); 

我意识到开始研究反应式编程范式是一个相当奇怪的地方。但是,替换这个同步服务代码是我的最终目标,如果我至少理解了语法,我就可以从中逆向工作。

共有1个答案

孟凯泽
2023-03-14

您只需要使用Observable,在您的流中,一个已发射的项目通过流。检查文档https://github.com/ReactiveX/RxJava

这将是一个连续的流程

  Observable.just(methodThatCallFirstServiceAndReturnObservable(params))
            .flatMap(resul1 -> methodThatCallSecondAndReturnObservable(resul1))
            .flatMap(resul2 -> methodThatCallThirdAndReturnObservable(resul2))
            .subscribe(result3->"Last value emmited here:");

您可以并行运行这三个服务调用,并使用Observable将所有值汇总在一起。压缩或合并。但我相信这不是你需要的。

 类似资料:
  • 从进程调度谈起 现代操作系统(如 Windows、Linux 等)都是分时系统。分时系统允许同时允许多个任务,但实际上,由于一台计算机通常只有一个 CPU,所以不可能真正地同时运行多个任务。这些进程实际上是轮番运行,每个进程运行一个时间片。由于时间片通常很短,用户不会感觉到,所以这些进程看起来就像是同时运行。 每个进程的时间片由操作系统完成初始化,所有进程轮番地执行相应的时间。具体下一个时间片轮到

  • 我有一个使用Spring Boot设计的RestFul Webservice。 web服务相当繁重,因为它必须在启动时进行大量的数据库调用,并且有些端到其他端进行大量的IO操作来提供结果。 我想让Restful Api成为异步的,这样它就可以更有伸缩性,而且它花时间来提供它的结果。 我甚至实现了这一点,但我无法测试这是否是异步的。 如果我想要 如果向url/all发出请求

  • 我认为下面的流量链将通过事件循环放置/执行(像JS)。因此,运行下面的代码将首先打印阻塞循环&然后将执行通量链。 但是,整个通量总是先执行,然后才移动到循环。[我确实有一些语句正在阻塞。但是有两个阶段] 当我们使用reactor时,通过使用一些调度程序来实现异步/非阻塞行为的唯一方法? 如果我不使用任何调度器,并让代码使用当前线程执行,那么即使对于IO密集型应用程序,使用WebFlux而不是Spr

  • 本文向大家介绍请你说一下阻塞,非阻塞,同步,异步相关面试题,主要包含被问及请你说一下阻塞,非阻塞,同步,异步时的应答技巧和注意事项,需要的朋友参考一下 参考回答: 阻塞和非阻塞:调用者在事件没有发生的时候,一直在等待事件发生,不能去处理别的任务这是阻塞。调用者在事件没有发生的时候,可以去处理别的任务这是非阻塞。 同步和异步:调用者必须循环自去查看事件有没有发生,这种情况是同步。调用者不用自己去查看

  • 实时的web特性通常需要为每个用户一个大部分时间都处于空闲的长连接. 在传统的同步web服务器中,这意味着需要给每个用户分配一个专用的线程,这样的开销是十分巨大的. 为了减小对于并发连接需要的开销,Tornado使用了一种单线程事件循环的方式. 这意味着所有应用程序代码都应该是异步和非阻塞的,因为在同一时刻只有一个操作是有效的. 异步和非阻塞这两个属于联系十分紧密而且通常交换使用,但是它们并不完全

  • 本文向大家介绍java 中同步、异步、阻塞和非阻塞区别详解,包括了java 中同步、异步、阻塞和非阻塞区别详解的使用技巧和注意事项,需要的朋友参考一下 java 中同步、异步、阻塞和非阻塞区别详解 简单点说: 阻塞就是干不完不准回来,一直处于等待中,直到事情处理完成才返回; 非阻塞就是你先干,我先看看有其他事没有,一发现事情被卡住,马上报告领导。 我们拿最常用的send和recv两个函数来说吧..