当前位置: 首页 > 知识库问答 >
问题:

在springboot中通过API调用方法更新记录时实现多线程

苏宾鸿
2023-03-14

我正在Springboot中编写一个应用程序,从数据库中提取记录,然后调用外部rest api将记录更新到其他表中。此代码已完成并按预期工作。因为我也需要提高html" target="_blank">性能。我正在尝试在调用API时实现mulithreading,以便一次可以发送多个记录。

结构:

Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API

ProvRecordProcessing。java:这个调用将从数据库中提取记录,创建一个列表并调用ProvRecordService。java ProvRecordService。java:此调用将处理所有API逻辑。。

经过一些研究,我尝试在下面实现使其多线程:

  1. 使ProvRecordService类实现Runnable并覆盖ulul run方法
  2. 而不是调用方法,调用执行器Service.execute(新的ProvRecordService(记录));

ProvRecordProcessing.java:

我已经从代码中删除了其他业务逻辑,只保留了调用API方法的部分。。

  @Component 
  public class ProvRecordProcessing {
  
  .....Code to fetch records from database....
      
      List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
      
      //added for multithreading
      ExecutorService executorService = Executors.newFixedThreadPool(2);
 
     //looping over list records and calling API to process records
     for(UpdateProvider record : provRecords) {
            
            executorService.execute(new ProvRecordService(record));
     }  
     executorService.shutdown();        
    }
}

ProvRecordService.java

为了使其具有多线程功能,我在下面的代码中添加了几个部分,并添加了注释://用于多线程

package com.emerald.paymentengineapi.service;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Service
public class ProvRecordService implements IFxiProviderService, Runnable {

    @Autowired
    RestSslException restSslTemplate;
    
    @Autowired
    DbConfig dbConfig;
    
    @Autowired
    UpdateProvider updateProvider; // added for multithreading

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Autowired
    TokenService tokenService;
    
    @Value("${SHIELD_API_URL}")
    private String SHIELD_API_URL;

    @Value("${token_expire_time}")
    private String token_expire;
    
    RestTemplate restTemplate;
    DataSource dataSource;
    
    UpdateProvider record; // added for multithreading

    Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
    

    private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
    
    public ProvRecordService(UpdateProvider record) { // added for multithreading
        // TODO Auto-generated constructor stub
        this.record = record;
    }

    @Override
    public void run() { // added for multithreading
        updateProvider(record);
    }

    @Scheduled(fixedRateString = "token_expire")
    public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
        logger.info("Fetching Token..." + token_expire);

        ResponseEntity<String> response = tokenService.getOauth2Token();
        return response;
    }

    @Override
    public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
        
        dataSource = dbConfig.dataSource();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        
        try {
            restTemplate = restSslTemplate.restTemplate();
        } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        ResponseEntity<String> response = null;
        
        try {

            if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
                runTokenScheduler();

            HttpHeaders headers = new HttpHeaders();
            headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
            System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
            System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));

            headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
                    + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
            headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
            

            List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));

            updateProviderRequest.getXpfRequestData().setRequestOptions(customers);

            HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);

            response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
            
            if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
                logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
                logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.info("Provider has been updated successfully");

            } else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.error("Internal Server error occures");
                
            } else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.error("Provider not found");

            }

        } catch (TokenServiceException ex) {
            logger.error("Exception occures in calling Token API");
            updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
            //throw new RuntimeException("Exception occures in API " + ex);
            
        } catch (HttpClientErrorException ex) {
            logger.error("HttpClientErrorException occures in calling API");
            updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
            //throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
            
        } catch (Exception ex) {
            logger.error("Exception occures in calling API");
            updateStatusInDB(ex.getMessage(), ex.getMessage(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());

            //throw new RuntimeException("Exception occures in API " + ex);
        }
        return response;

    }

    private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
        return jdbcTemplate.update(
                "update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
                errorCode, errorMessage, taxId, providerId);

    }

}

我调试了这段代码,它将失效,run方法和record也将被填充,但在此之后,它不会进入updateProvider方法进行处理,我得到以下错误:

 Exception in thread "pool-2-thread-1" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

更新:

经过进一步调试,我了解到,问题出现在以下行:

dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

我正在尝试在这里设置dataSource,当我没有添加多线程代码时,这很好。我无法得到原因。请建议。

共有1个答案

班凌
2023-03-14

此处的代码错误:

  1. 没有必要在每次请求执行时创建新的ExecutorService

更好的方法是只创建一次并将其保留为ProvRecordProcess组件的数据栏。在您的方法中创建线程很昂贵,您不知道可以同时创建多少线程(如果此方法被许多用户并行调用怎么办-如果每个用户都创建线程池,则可能非常昂贵)。

此外,如果使用线程池执行器,最好在应用程序关闭时将其关闭,所以不要忘记在predestroy或其他时候调用close。

     for(UpdateProvider record : provRecords) {
            
            executorService.execute(new ProvRecordService(record));
     }  

相反,将服务作为一个单例注入到ProvRecordProcessing组件中,并调用其负责从runnable/callable发送http请求的方法。以下是我的意思的示意图示例:

@Component
class ProvRecordProcessing {

   @Autowired
   private ProvRecordService provRecordService;

   ....

    for(UpdateProvider record : provRecords) {
            
            executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
     }
} 

通过这种方法,ProvRecordService成为一个常规的spring管理bean

对此有更高级的解决方案,即使用异步方法,无需“手动”维护线程池。例如,请参见本教程。。。既然你没有在问题中展示这些,我想这超出了你所问的范围,所以请记住它也是存在的。当然,如果您正确地实现了代码,它会很好。

 类似资料:
  • 我需要在Springboot中实现多线程,同时使用POST方法调用API。我根据一个SELECT查询从oracle数据库中提取记录,然后使用行映射器逐个遍历每个记录。在下一步中,我只调用一个方法将这些记录发送到API,以postmapping的形式发送这些记录并取回记录。 因为select查询一次可以返回10、20或100条记录。逐个调用每条记录并不理想。我在想我是否可以一次发送多个记录。我不知道

  • 问题内容: 我需要在一个请求中进行多次更新。 在我有: 因此需要进行更改。 这是我的序列化器代码: 我试图添加: 和 但这不起作用。如何更改此代码以进行多次更新。我的json请求 问题答案: 这是您请求的CreateMixins或UpdateMixins的示例。 ======================查看========================== ====== ==========

  • 本文向大家介绍C#获取变更过的DataTable记录的实现方法,包括了C#获取变更过的DataTable记录的实现方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#获取变更过的DataTable记录的实现方法,是一个非常实用的功能!具体实现方法如下: 首先DataTable可以看做是一个物理表的内存式存储,每一个DataRow都有一个属性叫做RowState。因此任意一行中某一个字段

  • 本文向大家介绍Android 中通过实现线程更新Progressdialog (对话进度条),包括了Android 中通过实现线程更新Progressdialog (对话进度条)的使用技巧和注意事项,需要的朋友参考一下 作为开发者我们需要经常站在用户角度考虑问题,比如在应用商城下载软件时,当用户点击下载按钮,则会有下载进度提示页面出现,现在我们通过线程休眠的方式模拟下载进度更新的演示,如图(这里为

  • 我知道以前有人问过这个问题,我已经阅读了很多答案,现在正在研究其中一个答案,但是我需要以下代码的帮助。 我在执行时没有收到任何错误,也没有任何内容提交到mysql表,如果你发现了什么,请告诉我,或者如果有更好的方法,请你给我指一个教程,我还没有太多的PDO或多行更新。 提前谢谢。 致山姆: 产出为:

  • 我正在编写一个小应用程序,现在我发现了一个问题。我需要调用一个(稍后可能是两个)方法(这个方法加载一些东西并返回结果),而不会滞后于应用程序的窗口。 我找到了像Executor或Callable这样的类,但我不知道如何使用这些类。 你能张贴任何解决方案,这对我有帮助吗? 谢谢你的建议。 编辑:方法必须返回结果。此结果取决于参数。类似这样: 此方法大约工作8-10秒。执行此方法后,可以停止线程。但我