当前位置: 首页 > 知识库问答 >
问题:

如何在列表中并行操作潜在的数据库交互

段弘和
2023-03-14

我试图使我的代码更有效地工作,因此我试图了解如何使其与Futures和ForkJoinPool一起工作。

现在我有这样的代码:

@RestController
@RequestMapping(SEND)
@Slf4j
public class InputMessageController {
private HandlerService service;
    @ApiResponses(value = {
            @ApiResponse(code = 200, message = "message sent"),
            @ApiResponse(code = 404, message = "channel not found or inactive"),
    })
    @RequestMapping(value = "/{channel}", method = RequestMethod.POST)
    @ResponseStatus(HttpStatus.OK)
    public List<ResponseDto> sendNotification(
            @Valid @RequestBody @NotNull List<PayloadInfoDto> requestDtoList,
            @PathVariable("channel") String url) throws InputChannelInactiveException {

        InputChannel channel = mapService.getChannelByUrl(url);

        if (channel != null){
            return service.processing(requestDtoList, channel);
        } else {
            loggerService.logChannelNotFound()
            throw new InputChannelInactiveException(url);
        }
    }
}
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {

        List<ResponseDto> responseDtoList = new ArrayList<>();

//this one takes quite long. would be better if it was done in multiple threads
        requestDtoList.forEach(inputRequestDto -> responseDtoList.add(processNorm(inputRequestDto, channel)));

        return responseDtoList;
    }
}

processNorm(PayloadInfoDto inputRequestDto, InputChannel channel) {
        RequestMessage msg;

        //call to multiple services which can throw an exception. Each exception is processed.
        //call to logger service which logs info to database

}

我的问题是:

  1. 对于使用数据库运行的logger服务,写操作是否比定期保存到db更好,特别是当我们不想等待db写操作并想继续时
private ExecutorService executor = Executors.newWorkStealingPool();
///....some code
public void exampleAsyncLog(){
//I don't wan't to wait while logger service writes to DB. It can take as long as it wants to write, 
//while I'll move on
executor.submit(() -> saveSomethingToDb()); //saveSomethingToDb() is trivial logRepository.save(newEntity)
}

共有1个答案

万高畅
2023-03-14

我会建议您使用CompletableFuture来完成此任务,因为您只希望在所有任务都保存在DB中时才能继续,这对您的任务来说是最安全的。因此,编写如下代码:

public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {

        List<ResponseDto> responseDtoList = new ArrayList<>();

//this one takes quite long. would be better if it was done in multiple threads
CompletableFuture<List<ResponseDto>> future
  = CompletableFuture.supplyAsync(() -> requestDtoList.stream().map(inputRequestDto -> processNorm(inputRequestDto, channel)).collect(Collectors.toList())); //This code is not tested. It's the logic that I am showing you to implement that will work for you.

        return future.get();//wait till all the operation is completed.
    }
}
 类似资料:
  • 创建表 查看表结构 查看表详细结构 修改表名 修改字段的数据类型 修改字段名 增加字段 删除字段 删除关联表 (1)删除表的外键约束 (2)删除没有被关联的普通表 (3)删除被其他表关联的父表 创建表: CREATE TABLE 表名 (属性名 数据类型 [完整性约束条件], 属性名 数据类型 [完整性约束条件], 属性名 数据类型 [完整性约束条件]) “完整性约束条件”是指指定某些字段的某些特

  • 环境:Scala、spark、结构化流媒体、Kafka 我有一个来自Kafka流的DF,具有以下模式 DF: 我希望使用spark并行处理每一行,并使用 我需要从值列中提取值到它自己的数据框中进行处理。我有困难与Dataframe通用行对象... 是否有办法将每个执行器中的单行转换为自己的Dataframe(使用固定模式?)在固定的地点写字?有没有更好的方法来解决我的问题? 编辑澄清: DF im

  • 以下语句都可以直接在InfluxDB的Web管理界面中调用 # 创建数据库 CREATE DATABASE "db_name" # 显示所有数据库 SHOW DATABASES # 删除数据库 DROP DATABASE "db_name" # 使用数据库 USE mydb # 显示该数据库中的表 SHOW MEASUREMENTS # 创建表 # 直接在插入数据的时候指定表名(weathe

  • 本文向大家介绍如何在R中制作数据帧列表?,包括了如何在R中制作数据帧列表?的使用技巧和注意事项,需要的朋友参考一下 这可以通过使用列表功能来完成。 示例 创建上述数据帧的列表-

  • 本文向大家介绍node.js如何操作MySQL数据库,包括了node.js如何操作MySQL数据库的使用技巧和注意事项,需要的朋友参考一下 MySQL数据库作为最流行的开源数据库。基本上是每个web开发者必须要掌握的数据库程序之一了。 基本使用 node.js上,最受欢迎的mysql包就是mysql模块。 然后在js脚本里面直接引用进来 配置mysql的数据库连接。 这样就拿到了一个连接。 然后就

  • 问题内容: 我是脚本新手。我有一个表(),我需要创建另一个表,该表的Table1行按列排列,反之亦然。我已经找到了针对Perl和SQL而不是针对Python的解决方案。 我两天前才开始学习Python,所以据我所知: 这只是将列复制为列。我现在想做的是将最后一行写为,但是似乎没有这样的命令,而且我还没有找到将行写为列的另一种方法。 问题答案: 通常,转置可迭代序列的解决方案是:zip(* orig