我正在Springboot中编写一个应用程序,从数据库中提取记录,然后调用外部rest api将记录更新到其他表中。此代码已完成并按预期工作。因为我也需要提高html" target="_blank">性能。我正在尝试在调用API时实现mulithreading,以便一次可以发送多个记录。
结构:
Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API
ProvRecordProcessing。java:这个调用将从数据库中提取记录,创建一个列表并调用ProvRecordService。java ProvRecordService。java:此调用将处理所有API逻辑。。
经过一些研究,我尝试在下面实现使其多线程:
ProvRecordProcessing.java:
我已经从代码中删除了其他业务逻辑,只保留了调用API方法的部分。。
@Component
public class ProvRecordProcessing {
.....Code to fetch records from database....
List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
//added for multithreading
ExecutorService executorService = Executors.newFixedThreadPool(2);
//looping over list records and calling API to process records
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
executorService.shutdown();
}
}
ProvRecordService.java
为了使其具有多线程功能,我在下面的代码中添加了几个部分,并添加了注释://用于多线程
package com.emerald.paymentengineapi.service;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class ProvRecordService implements IFxiProviderService, Runnable {
@Autowired
RestSslException restSslTemplate;
@Autowired
DbConfig dbConfig;
@Autowired
UpdateProvider updateProvider; // added for multithreading
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
TokenService tokenService;
@Value("${SHIELD_API_URL}")
private String SHIELD_API_URL;
@Value("${token_expire_time}")
private String token_expire;
RestTemplate restTemplate;
DataSource dataSource;
UpdateProvider record; // added for multithreading
Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
public ProvRecordService(UpdateProvider record) { // added for multithreading
// TODO Auto-generated constructor stub
this.record = record;
}
@Override
public void run() { // added for multithreading
updateProvider(record);
}
@Scheduled(fixedRateString = "token_expire")
public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
logger.info("Fetching Token..." + token_expire);
ResponseEntity<String> response = tokenService.getOauth2Token();
return response;
}
@Override
public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
restTemplate = restSslTemplate.restTemplate();
} catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ResponseEntity<String> response = null;
try {
if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
runTokenScheduler();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
+ TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));
updateProviderRequest.getXpfRequestData().setRequestOptions(customers);
HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);
response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.info("Provider has been updated successfully");
} else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Internal Server error occures");
} else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Provider not found");
}
} catch (TokenServiceException ex) {
logger.error("Exception occures in calling Token API");
updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
} catch (HttpClientErrorException ex) {
logger.error("HttpClientErrorException occures in calling API");
updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
} catch (Exception ex) {
logger.error("Exception occures in calling API");
updateStatusInDB(ex.getMessage(), ex.getMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
}
return response;
}
private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
return jdbcTemplate.update(
"update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
errorCode, errorMessage, taxId, providerId);
}
}
我调试了这段代码,它将失效,run方法和record也将被填充,但在此之后,它不会进入updateProvider方法进行处理,我得到以下错误:
Exception in thread "pool-2-thread-1" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
更新:
经过进一步调试,我了解到,问题出现在以下行:
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
我正在尝试在这里设置dataSource,当我没有添加多线程代码时,这很好。我无法得到原因。请建议。
此处的代码错误:
更好的方法是只创建一次并将其保留为ProvRecordProcess
组件的数据栏。在您的方法中创建线程很昂贵,您不知道可以同时创建多少线程(如果此方法被许多用户并行调用怎么办-如果每个用户都创建线程池,则可能非常昂贵)。
此外,如果使用线程池执行器,最好在应用程序关闭时将其关闭,所以不要忘记在predestroy或其他时候调用close。
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
相反,将服务作为一个单例注入到ProvRecordProcessing组件中,并调用其负责从runnable/callable发送http请求的方法。以下是我的意思的示意图示例:
@Component
class ProvRecordProcessing {
@Autowired
private ProvRecordService provRecordService;
....
for(UpdateProvider record : provRecords) {
executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
}
}
通过这种方法,ProvRecordService成为一个常规的spring管理bean。
对此有更高级的解决方案,即使用异步方法,无需“手动”维护线程池。例如,请参见本教程。。。既然你没有在问题中展示这些,我想这超出了你所问的范围,所以请记住它也是存在的。当然,如果您正确地实现了代码,它会很好。
我需要在Springboot中实现多线程,同时使用POST方法调用API。我根据一个SELECT查询从oracle数据库中提取记录,然后使用行映射器逐个遍历每个记录。在下一步中,我只调用一个方法将这些记录发送到API,以postmapping的形式发送这些记录并取回记录。 因为select查询一次可以返回10、20或100条记录。逐个调用每条记录并不理想。我在想我是否可以一次发送多个记录。我不知道
问题内容: 我需要在一个请求中进行多次更新。 在我有: 因此需要进行更改。 这是我的序列化器代码: 我试图添加: 和 但这不起作用。如何更改此代码以进行多次更新。我的json请求 问题答案: 这是您请求的CreateMixins或UpdateMixins的示例。 ======================查看========================== ====== ==========
本文向大家介绍C#获取变更过的DataTable记录的实现方法,包括了C#获取变更过的DataTable记录的实现方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#获取变更过的DataTable记录的实现方法,是一个非常实用的功能!具体实现方法如下: 首先DataTable可以看做是一个物理表的内存式存储,每一个DataRow都有一个属性叫做RowState。因此任意一行中某一个字段
本文向大家介绍Android 中通过实现线程更新Progressdialog (对话进度条),包括了Android 中通过实现线程更新Progressdialog (对话进度条)的使用技巧和注意事项,需要的朋友参考一下 作为开发者我们需要经常站在用户角度考虑问题,比如在应用商城下载软件时,当用户点击下载按钮,则会有下载进度提示页面出现,现在我们通过线程休眠的方式模拟下载进度更新的演示,如图(这里为
我知道以前有人问过这个问题,我已经阅读了很多答案,现在正在研究其中一个答案,但是我需要以下代码的帮助。 我在执行时没有收到任何错误,也没有任何内容提交到mysql表,如果你发现了什么,请告诉我,或者如果有更好的方法,请你给我指一个教程,我还没有太多的PDO或多行更新。 提前谢谢。 致山姆: 产出为:
我正在编写一个小应用程序,现在我发现了一个问题。我需要调用一个(稍后可能是两个)方法(这个方法加载一些东西并返回结果),而不会滞后于应用程序的窗口。 我找到了像Executor或Callable这样的类,但我不知道如何使用这些类。 你能张贴任何解决方案,这对我有帮助吗? 谢谢你的建议。 编辑:方法必须返回结果。此结果取决于参数。类似这样: 此方法大约工作8-10秒。执行此方法后,可以停止线程。但我