当前位置: 首页 > 知识库问答 >
问题:

带有https请求多线程Spring的JPA

孙帅
2023-03-14

我正在使用spring JPA和< code>HTTP post请求,逐行获取数据,然后将数据发送到API的HTTP请求中,这对我来说很好,但这里我使用的是大量数据,所以我必须使用多线程,但我是java和spring的新手,我如何实现使用10个线程,每个线程每次并行读取1k的数据?

我读过关于10个线程的多线程,其中每个线程每次读取1k行,我的数据库中有大约1000万条记录

访问DataJpaApplication类:

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
    @Autowired

    private Bulk_repositoryRepository bulk_repositoryRepository;


    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }
    Date currentDate = new Date();

    @Override
    public void run(String... args) throws Exception {
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
        headers.setBasicAuth("user", "pass");

        while(true) {
            Date currentDate = new Date();
            logger.info("Just Started"); 
            for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
                System.out.print(churnss);
                logger.info(churnss.toString());
                AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

                logger.info(AddOffer.toString());
                HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

                ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);

                if(responseEntity.getStatusCode() == HttpStatus.OK){
                    String response = responseEntity.getBody();
                    churnss.setStatus(1);
                    churnss.setProcessDate(new Date());
                    churnss.setFulfilment_status(response);
                    logger.info(churnss.toString() + ", Response: " + response);
                    bulk_repositoryRepository.save(churnss);
                }else {
                    logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
                }
            }
            Thread.sleep(1000);
        }
    }

}

Bulk_repository类:

@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {

   @Id
   @GeneratedValue(strategy=GenerationType.AUTO)
   @Column(name = "id")
   private long id;

   @Column(name = "msisdn")
   private String msisdn;

   @Column(name = "camp_start_date")   
   private Date campStartDate;

   @Column(name = "camp_end_date")
   private Date campEndDate;

   @Column(name = "camp_type")
   private int campType;

   @Column(name = "camp_cd")
   private String camp_cd;

   @Column(name = "status")
   private int status;

   @Column(name = "process_date")
   private Date processDate;

   @Column(name = "entry_date")
   private Date entryDate;

   @Column(name = "entry_user")
   private String entry_user;

   @Column(name = "param1")
   private String param1;

   @Column(name = "param2")
   private String param2;

   @Column(name = "param3")
   private String param3;

   @Column(name = "param4")
   private String param4;

   @Column(name = "param5")
   private String param5;

   @Column(name = "error_desc")
   private String error_desc;

   @Column(name = "fulfilment_status")
   private int fulfilment_status;
   ##then getter and setters and tostring

Bulk_repositoryRepository类 :

public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {

      Date today = new Date();
      List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
      Bulk_repository findById(long id);
}

AddOfferRequest类:

public class AddOfferRequest {

    private String ChannelID="113";
    private String MSISDN;
    private String ServiceID;

    public AddOfferRequest() {
    }
    public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
        this.MSISDN = mSISDN;
        this.ServiceID = serviceID;

    }
    ## then getter and setter and tostring

我已经创建了Async配置类:

package com.example.accessingdatajpa;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotathtml" target="_blank">ion.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;


@Configuration
@EnableAsync
public class AsyncConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
    @Bean (name = "taskExecutor")
    public Executor taskExecutor() {
        LOGGER.debug("Creating Async Task Executor");
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("CarThread-");
        executor.initialize();
        return executor;
    }
}

但是到现在我还不明白如何用多线程来结合findby和http post

共有3个答案

拓拔骁
2023-03-14

尝试使用Hibernate/JPA 进行批量插入/更新。这是一个不错的教程

spring.jpa.properties.hibernate.jdbc.batch_size=500

太叔昊穹
2023-03-14

使用@Async注解在Spring中实现多线程。它可以帮助你。https://spring.io/guides/gs/async-method/https://docs.spring.io/spring-data/rest/docs/2.0.0.M1/reference/html/paging-chapter.html

霍建柏
2023-03-14

重写代码。而不是列表

然后使用 TaskExecutor 执行每个线程的不同请求,只需向其提供一个任务,它就会在有空闲线程时执行。

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

    @Autowired
    private Bulk_repositoryRepository bulk_repositoryRepository;

    @Autowired
    private AsyncTaskExecutor executor;

    @Autowired
    private RestTemplate rest;

    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }

    @Override
    public void run(String... args) throws Exception {
        Date currentDate = new Date();

        Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);

        results.forEach(it -> executor.submit(this.process(it)));
        Thread.sleep(1000);
    }

    private void process(RestTemplate rest, Bulk_repository churnss) {
      AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

      HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

      try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);
         if(responseEntity.getStatusCode() == HttpStatus.OK){
           String response = responseEntity.getBody();
           churnss.setStatus(1);
           churnss.setProcessDate(new Date());
           churnss.setFulfilment_status(response);
           bulk_repositoryRepository.save(churnss);
         }else {
           logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());
                }
      } catch (RestClientException rce) {
          logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);
      }               
    }

}

注意:这是从我的头顶键入的,未经测试。但是应该提供一些指导。

 类似资料:
  • 问题内容: 我过去两天一直在尝试构建具有多线程功能的刮板。不知何故我仍然无法管理它。最初,我尝试使用带有线程模块的常规多线程方法,但这并不比使用单个线程快。后来我了解到请求正在阻塞,并且多线程方法并没有真正起作用。因此,我不断研究并发现有关grequests和gevent的信息。现在,我正在使用gevent运行测试,它仍然没有比使用单个线程快。我的编码有误吗? 这是我课程的相关部分: 问题答案:

  • javax.net.ssl.SSLHandShakeException:>java.security.cert.certPathValidatorException:未找到证书的信任锚>路径。 这条信息是什么意思?

  • 问题内容: 我想使用Spring MVC发布带有一些JSON数据的文件。因此,我开发了一项休息服务 当我从其他客户端发送请求时 ,出现下一个异常: 谁能帮助我解决这个问题? 我可以同时将Multipart和JSON发送到服务器吗? 问题答案: 这就是我使用JSON数据实现Spring MVC Multipart Request的方式。 带有JSON数据的分段请求(也称为混合分段): 基于Sprin

  • 我的Java应用程序使用各种代理服务器从特定域收集数据。特别申请需要下列程序: 通过特定代理加载URL 等待5秒 通过同一个代理加载下一个url 为了使信息的加载(由于5秒的暂停)不需要永远,我总共使用400个线程。每个线程都使用自己的代理服务器,也就是使用自己的OKHTTP客户端: 每个线程必须使用自己的代理,因此每个线程都有自己的OKHTTP客户端。总共有400个OKHTTP客户端。 我做了一

  • 问题内容: 我尝试了python 请求库文档中提供的示例。 使用,我得到了响应代码,但是我想获得所请求的每个页面的内容。例如,这不起作用: 问题答案: 注意 下面的答案是不适用于请求v0.13.0 +。编写此问题后,异步功能已移至。但是,你可以将其替换为下面的内容,它应该可以工作。 我已经留下了这个答案,以反映原始问题,即有关使用请求的问题。 要异步执行多个任务,你必须: 为每个对象定义一个函数(

  • 问题内容: 我试图了解如何在使用Spring进行事务管理的Java应用程序中实现线程。我已经在Spring文档中找到TaskExecutor部分,并且ThreadPoolTask​​Executor看起来很适合我的需求。 ThreadPoolTask​​Executor 该实现只能在Java 5环境中使用,也是该环境中最常用的一种。它公开了用于配置java.util.concurrent.Thre