我正在使用spring JPA
和< code>HTTP post请求,逐行获取数据,然后将数据发送到API的HTTP请求中,这对我来说很好,但这里我使用的是大量数据,所以我必须使用多线程,但我是java和spring的新手,我如何实现使用10个线程,每个线程每次并行读取1k的数据?
我读过关于10个线程的多线程,其中每个线程每次读取1k行,我的数据库中有大约1000万条记录
访问DataJpaApplication类:
@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private Bulk_repositoryRepository bulk_repositoryRepository;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
Date currentDate = new Date();
@Override
public void run(String... args) throws Exception {
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
headers.setBasicAuth("user", "pass");
while(true) {
Date currentDate = new Date();
logger.info("Just Started");
for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
System.out.print(churnss);
logger.info(churnss.toString());
AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());
logger.info(AddOffer.toString());
HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);
ResponseEntity<String> responseEntity = restTemplate.exchange(
"api link", HttpMethod.POST, entity, String.class);
if(responseEntity.getStatusCode() == HttpStatus.OK){
String response = responseEntity.getBody();
churnss.setStatus(1);
churnss.setProcessDate(new Date());
churnss.setFulfilment_status(response);
logger.info(churnss.toString() + ", Response: " + response);
bulk_repositoryRepository.save(churnss);
}else {
logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
}
}
Thread.sleep(1000);
}
}
}
Bulk_repository类:
@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
@Column(name = "id")
private long id;
@Column(name = "msisdn")
private String msisdn;
@Column(name = "camp_start_date")
private Date campStartDate;
@Column(name = "camp_end_date")
private Date campEndDate;
@Column(name = "camp_type")
private int campType;
@Column(name = "camp_cd")
private String camp_cd;
@Column(name = "status")
private int status;
@Column(name = "process_date")
private Date processDate;
@Column(name = "entry_date")
private Date entryDate;
@Column(name = "entry_user")
private String entry_user;
@Column(name = "param1")
private String param1;
@Column(name = "param2")
private String param2;
@Column(name = "param3")
private String param3;
@Column(name = "param4")
private String param4;
@Column(name = "param5")
private String param5;
@Column(name = "error_desc")
private String error_desc;
@Column(name = "fulfilment_status")
private int fulfilment_status;
##then getter and setters and tostring
Bulk_repositoryRepository类 :
public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {
Date today = new Date();
List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
Bulk_repository findById(long id);
}
AddOfferRequest类:
public class AddOfferRequest {
private String ChannelID="113";
private String MSISDN;
private String ServiceID;
public AddOfferRequest() {
}
public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
this.MSISDN = mSISDN;
this.ServiceID = serviceID;
}
## then getter and setter and tostring
我已经创建了Async配置类:
package com.example.accessingdatajpa;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotathtml" target="_blank">ion.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
@Bean (name = "taskExecutor")
public Executor taskExecutor() {
LOGGER.debug("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("CarThread-");
executor.initialize();
return executor;
}
}
但是到现在我还不明白如何用多线程来结合findby和http post
尝试使用Hibernate/JPA 进行批量插入/更新。这是一个不错的教程
spring.jpa.properties.hibernate.jdbc.batch_size=500
使用@Async注解在Spring中实现多线程。它可以帮助你。https://spring.io/guides/gs/async-method/https://docs.spring.io/spring-data/rest/docs/2.0.0.M1/reference/html/paging-chapter.html
重写代码。而不是列表
然后使用
TaskExecutor
执行每个线程的不同请求,只需向其提供一个任务,它就会在有空闲线程时执行。
@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private Bulk_repositoryRepository bulk_repositoryRepository;
@Autowired
private AsyncTaskExecutor executor;
@Autowired
private RestTemplate rest;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
@Override
public void run(String... args) throws Exception {
Date currentDate = new Date();
Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);
results.forEach(it -> executor.submit(this.process(it)));
Thread.sleep(1000);
}
private void process(RestTemplate rest, Bulk_repository churnss) {
AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());
HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);
try {
ResponseEntity<String> responseEntity = restTemplate.exchange(
"api link", HttpMethod.POST, entity, String.class);
if(responseEntity.getStatusCode() == HttpStatus.OK){
String response = responseEntity.getBody();
churnss.setStatus(1);
churnss.setProcessDate(new Date());
churnss.setFulfilment_status(response);
bulk_repositoryRepository.save(churnss);
}else {
logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());
}
} catch (RestClientException rce) {
logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);
}
}
}
注意:这是从我的头顶键入的,未经测试。但是应该提供一些指导。
问题内容: 我过去两天一直在尝试构建具有多线程功能的刮板。不知何故我仍然无法管理它。最初,我尝试使用带有线程模块的常规多线程方法,但这并不比使用单个线程快。后来我了解到请求正在阻塞,并且多线程方法并没有真正起作用。因此,我不断研究并发现有关grequests和gevent的信息。现在,我正在使用gevent运行测试,它仍然没有比使用单个线程快。我的编码有误吗? 这是我课程的相关部分: 问题答案:
javax.net.ssl.SSLHandShakeException:>java.security.cert.certPathValidatorException:未找到证书的信任锚>路径。 这条信息是什么意思?
问题内容: 我想使用Spring MVC发布带有一些JSON数据的文件。因此,我开发了一项休息服务 当我从其他客户端发送请求时 ,出现下一个异常: 谁能帮助我解决这个问题? 我可以同时将Multipart和JSON发送到服务器吗? 问题答案: 这就是我使用JSON数据实现Spring MVC Multipart Request的方式。 带有JSON数据的分段请求(也称为混合分段): 基于Sprin
我的Java应用程序使用各种代理服务器从特定域收集数据。特别申请需要下列程序: 通过特定代理加载URL 等待5秒 通过同一个代理加载下一个url 为了使信息的加载(由于5秒的暂停)不需要永远,我总共使用400个线程。每个线程都使用自己的代理服务器,也就是使用自己的OKHTTP客户端: 每个线程必须使用自己的代理,因此每个线程都有自己的OKHTTP客户端。总共有400个OKHTTP客户端。 我做了一
问题内容: 我尝试了python 请求库文档中提供的示例。 使用,我得到了响应代码,但是我想获得所请求的每个页面的内容。例如,这不起作用: 问题答案: 注意 下面的答案是不适用于请求v0.13.0 +。编写此问题后,异步功能已移至。但是,你可以将其替换为下面的内容,它应该可以工作。 我已经留下了这个答案,以反映原始问题,即有关使用请求的问题。 要异步执行多个任务,你必须: 为每个对象定义一个函数(
问题内容: 我试图了解如何在使用Spring进行事务管理的Java应用程序中实现线程。我已经在Spring文档中找到TaskExecutor部分,并且ThreadPoolTaskExecutor看起来很适合我的需求。 ThreadPoolTaskExecutor 该实现只能在Java 5环境中使用,也是该环境中最常用的一种。它公开了用于配置java.util.concurrent.Thre