当前位置: 首页 > 面试题库 >

Spring Batch Item Reader仅执行一次

岳浩
2023-03-14
问题内容

尝试实现Spring批处理,但遇到一个奇怪的问题,我们的 ItemReader类仅执行一次。

以下是详细信息。

  1. 如果我们在数据库中有1000行。
  2. 我们的项目读取器从数据库中获取1000行,并将列表传递给 ItemWriter
  3. ItemWriter 成功删除所有项目。
  4. 现在,ItemReader再次尝试从数据库获取数据,但是没有找到,因此返回NULL,因此执行停止。
  5. 但是我们已将批处理配置为使用Quartz调度程序执行,即每分钟执行一次。
  6. 现在,如果我们通过转储导入在数据库中插入1000行,那么批处理作业应在下一次执行时选择该数据,尽管JobLauncher执行中它甚至没有
    执行。


配置: -

1.我们有ItemReader,ItemWriter,提交间隔等于1。

<batch:job id="csrfTokenBatchJob">
    <batch:step id="step1">
      <tasklet>
        <chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></chunk>
      </tasklet>
    </batch:step>
  </batch:job>

2.Job计划每分钟触发一次。

<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
      <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
        <property name="jobDetail" ref="jobDetail" />
        <property name="cronExpression" value="0 0/1 * * * ?" />
      </bean>
    </property>
  </bean>

3,作业配置

<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
    <property name="jobClass" value="com.tavant.oauth.batch.job.CSRFTokenJobLauncher" />
    <property name="jobDataAsMap">
        <map>
            <entry key="jobName" value="csrfTokenCleanUpBatchJob" />
            <entry key="jobLocator" value-ref="jobRegistry" />
            <entry key="jobLauncher" value-ref="jobLauncher" />
        </map>
    </property>
</bean>

第一次成功执行,但是后来不执行,但是我可以在日志中看到JobLauncher正在执行。

@Component("csrfTokenReader")
@Scope(value="step")
public class CSRFTokenReader implements ItemReader<List<CSRFToken>> {

    private static final Logger logger = LoggerFactory.getLogger(CSRFTokenReader.class);

    @Autowired
    private CleanService cleanService;

    @Override
    public List<CSRFToken> read() {
        List<CSRFToken> csrfTokenList = null;
        try{

            int keepUpto = Integer.valueOf(PropertiesContext.getInstance().getProperties().getProperty("token.keep", "1"));

            Calendar calTime = Calendar.getInstance();
            calTime.add(Calendar.HOUR, -keepUpto);
            Date toKeep = calTime.getTime();

            csrfTokenList = cleanService.getCSRFTokenByTime(toKeep);
        }
        catch(Throwable th){
            logger.error("Exception in running job At " + new Date() + th);
        }
        if(CollectionUtils.isEmpty(csrfTokenList)){
            return null;
        }
        return csrfTokenList;
    }
}

编辑: -

public class CSRFTokenJobLauncher extends QuartzJobBean {
    static final String JOB_NAME = "jobName";
    private JobLocator jobLocator;
    private JobLauncher jobLauncher;
    public void setJobLocator(JobLocator jobLocator) {
        this.jobLocator = jobLocator;
    }
    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }
    @Override
    protected void executeInternal(JobExecutionContext context) {
        Map<String, Object> jobDataMap = context.getMergedJobDataMap();
        String jobName = (String) jobDataMap.get(JOB_NAME);
        log.info("Quartz trigger firing with Spring Batch jobName="+jobName);
        JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
        try {
            jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
        }
        catch (JobExecutionException e) {
            log.error("Could not execute job.", e);
        }
    }
    private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
        JobParametersBuilder builder = new JobParametersBuilder();
        for (Entry<String, Object> entry : jobDataMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (value instanceof String && !key.equals(JOB_NAME)) {
                builder.addString(key, (String) value);
            }
            else if (value instanceof Float || value instanceof Double) {
                builder.addDouble(key, ((Number) value).doubleValue());
            }
            else if (value instanceof Integer || value instanceof Long) {
                builder.addLong(key, ((Number)value).longValue());
            }
            else if (value instanceof Date) {
                builder.addDate(key, (Date) value);
            }
        }
        return builder.toJobParameters();
    }
}

问题答案:

经过数小时的时间浪费,问题似乎现在已经解决,我已allow-start-if- complete="true"在tasklet中进行了配置。现在,批处理项目读取器正在按计划执行。

<batch:job id="csrfTokenBatchJob">
    <batch:step id="step1">
      <batch:tasklet allow-start-if-complete="true">
        <batch:chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></batch:chunk>
      </batch:tasklet>
    </batch:step>
  </batch:job>


 类似资料:
  • 问题内容: 我很难弄清楚为什么while循环实际上不会循环。它运行一次并停止。 我正在尝试使其循环,以便用户能够多次转换单位。任何帮助都欢迎! 问题答案: 问题在于,当您呼叫时,它会占用该号码,但不会占用该号码之后的换行符。要解决此问题,只需在调用后放一行代码。 示例和完整说明: 假设您输入“ km”,按回车,“ 123”,按回车。从程序的角度来看,输入流为。 该代码获取值,并且使输入超出第一个。

  • 15.2 仅执行一次的工作调度 首先,我们先来谈谈单一工作调度的运行,那就是 at 这个指令的运行! 15.2.1 atd 的启动与 at 运行的方式 要使用单一工作调度时,我们的 Linux 系统上面必须要有负责这个调度的服务,那就是 atd 这个玩意儿。 不过并非所有的 Linux distributions 都默认会把他打开的,所以呢,某些时刻我们必须要手动将他启用才行。 启用的方法很简单,

  • 我需要创建一个具有架构列表的@Scheduled方法,对于每个架构,从2个表中删除行。 我将deleteFromCustomerTables定义为@Transactional(propagation=propagation.REQUIRES_NEW),并在其中使用EntityManager从2个表中删除行。 为了使其工作,我需要将@Transactional添加到计划FixedDelayTask,

  • 问题内容: 我正在编写一个Django中间件类,该类只想在启动时执行一次,以初始化一些其他人工代码。我遵循了sdolan 在此处发布的非常好的解决方案,但是“ Hello”消息两次输出到终端。例如 在我的Django设置文件中,该类已包含在列表中。 但是当我使用runserver运行Django并请求页面时,我进入了终端 有什么想法为什么要打印两次“ Hello world”?谢谢。 问题答案:

  • 我正在使用以下项目结构Project ---子项目1 ---子项目2 ---子项目3 |build.gradle |设置。格拉德尔 子项目2取决于子项目1 子项目3取决于子项目2 对于所有子项目,我需要有任务“构建”,这将是每个项目的自定义任务。但在“构建”之前必须执行一些初始化任务(对于所有子项目都相同)。问题是,这个初始化任务必须在构建期间执行一次,而且只能执行一次,无论我在构建什么-根项目或

  • 问题内容: 我正在编写一个Django中间件类,该类只想在启动时执行一次,以初始化一些其他人工代码。我遵循了sdolan 在此处发布的非常好的解决方案,但是“ Hello”消息两次输出到终端。例如 在我的Django设置文件中,该类已包含在列表中。 但是当我使用runserver运行Django并请求页面时,我进入了终端 有什么想法为什么要打印两次“ Hello world”?谢谢。 问题答案: