当编写器抛出异常时,我希望能够将步骤和作业状态设置为失败。在做了一些调试和检查Spring批处理源代码后,我注意到RepeatTemboard
配置了一个SimpleRetryExceptionHandler
,它认为BatchRuntimeExctive
是一个致命的异常,因此将作业状态设置为FAILED,所以我将代码包装在我的编写器中的一个try-get中,将RuntimeExctive
包装在BatchRuntimeExctive
中,现在作业和步骤状态设置为FAILED,这是我想要的。我不确定这是否是正确的方法,因为我在任何地方都找不到它的文档,BatchRuntimeExctive
的留档也没有提到它。所以,问题是:这是正确的方法吗?
我想到的另一件事是,如果写操作失败,那么让作业失败是否有意义,但考虑到我的用例,我认为这是有意义的,就像这样:使用流读取器从数据库读取条目(它将查询配置为针对数据库运行),然后使用流编写器通过电子邮件或http发送这些条目(在stream open方法中检索要发送项目的配置)。如果一切都成功,则使用“已发送”状态更新数据库条目,如果在doOpen/open/write方法期间发生错误,则调用StepExecutionListener
发送通知电子邮件,告知作业失败。这就是我需要将作业状态设置为FAILED的原因之一,以便正确执行StepExecutionListener
(它检查ExitCode
)。第二个原因是,我想使用spring Batch Admin应用程序来管理作业,如果作业显示为已完成,尽管写入失败,但这似乎有误导性,因为作业的全部目的是发送项目。这样,在更改配置(例如正确配置收件人电子邮件地址)后,如果作业失败,可以重新启动作业。
此外,如果写调用失败,那么数据库中的条目应该将其状态更新为FAILED,这是我计划在新事务中的ItemWriteListener
的onWriteError中执行的操作,因为当前事务将被回滚。
我发布了这么长的描述,只是为了确保我没有违背这里框架的意图,试图从作者那里将作业状态设置为失败。
期待你对此的想法。
你好克丽丝蒂
P. S.:工作是这样配置的:
<batch:job id="job">
<batch:step id="step">
<batch:tasklet>
<batch:chunk reader="reader" writer="writer" reader-transactional-queue="true" commit-interval="#{properties['export.page.size']}"/>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="failedStepListener"/>
</batch:listeners>
</batch:step>
</batch:job>
后期编辑:读取器和写入器配置如下:
<bean name="reader" class="...LeadsReader" scope="step">
<property name="campaignId" value="#{jobParameters[campaignId]}" />
<property name="partnerId" value="#{jobParameters[partnerId]}" />
<property name="from" value="#{jobParameters[from]}" />
<property name="to" value="#{jobParameters[to]}" />
<property name="saveState" value="false" /> <!-- we use a database flag to indicate processed records -->
</bean>
<bean name="writer" class="...LeadsItemWriter" scope="step">
<property name="campaignId" value="#{jobParameters[campaignId]}" />
</bean>
作者的代码是:
public class LeadsItemWriter extends AbstractItemStreamItemWriter<Object[]> {
//fields and setters omitted
public LeadsItemWriter() {
setName(ClassUtils.getShortName(LeadsItemWriter.class));
}
@Override
public void open(ExecutionContext executionContext) {
super.open(executionContext);
PartnerCommunicationDTO partnerCommunicationDTO = this.leadableService.getByCampaignId(this.campaignId)
.getPartnerCommDTO();
this.transportConfig = partnerCommunicationDTO != null ? partnerCommunicationDTO.getTransportConfig() : null;
this.encoding = partnerCommunicationDTO != null ? partnerCommunicationDTO.getEnconding() : null;
if (this.transportConfig == null) {
throw new ItemStreamException ("Failed to retrieve the transport configuration for campaign id: "
+ this.campaignId);
}
PageRequestDTO pageRequestDTO = this.pageRequestMapper.map(partnerCommunicationDTO);
if (pageRequestDTO == null) {
throw new ItemStreamException("Wrong transport mapping configured for campaign id: " + this.campaignId);
}
this.columnNames = new ArrayList<>();
for (LeadColumnDTO leadColumnDTO : pageRequestDTO.getColumns()) {
this.columnNames.add(leadColumnDTO.getName());
}
}
@Override
public void write(List<? extends Object[]> items) throws Exception {
try {
if (this.transportConfig.getTransportType() == TransportConfigEnum.EMAIL) {
this.leadExporterService.sendLeads(items, this.columnNames, this.transportConfig, this.encoding);
} else {
for (Object[] lead : items) {
this.leadExporterService.sendLead(lead, this.columnNames, this.transportConfig, this.encoding);
}
}
} catch (RuntimeException e) {
LOGGER.error("Encountered exception while sending leads.", e);
//wrap exception so that the job fails and the notification listener gets called
throw new BatchRuntimeException(e);
}
}
}
给读者的代码:
public class LeadsReader extends AbstractPagingItemReader<Object[]> {
//fields and setters omitted
public LeadsReader() {
setName(ClassUtils.getShortName(LeadsReader.class));
}
@Override
protected void doOpen() throws Exception {
this.pageRequestDTO = this.pageRequestMapper.map(this.leadableService.getByCampaignId(this.campaignId)
.getPartnerCommDTO());
if (pageRequestDTO == null) {
throw new ItemStreamException("Wrong transport mapping configured for campaign id: " + this.campaignId);
}
this.timeInterval = new LeadTimeIntervalDTO(this.from != null ? of(this.from,
LeadQueryFilterParam.Comparison.GT) : null,
this.to != null ? of(this.to, LeadQueryFilterParam.Comparison.LE) : null);
super.doOpen();
}
private LeadFilterDTO of(Date date, LeadQueryFilterParam.Comparison comparison) {
LeadFilterDTO filterDTO = new LeadFilterDTO();
filterDTO.setColumn(CREATION_DATE);
filterDTO.setSqlType(DATE);
filterDTO.setComparison(comparison.name());
filterDTO.setValue(DateUtil.format(date, Validator.DATE_FORMAT));
return filterDTO;
}
@Override
protected void doReadPage() {
if (results == null) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
if (this.pageRequestDTO != null) {
results.addAll(LeadsReader.this.leadStorageService.listLeads(
LeadsReader.this.pageRequestDTO.getColumns(),
LeadsReader.this.getFilters(),
LeadsReader.this.pageRequestDTO.getQueryOrderByParams(),
LeadsReader.this.pageRequestDTO.isUniqueByEmail(), LeadsReader.this.timeInterval,
(long) getPage() + 1, (long) getPageSize()).getExportedLeadsRows());
}
}
private List<LeadFilterDTO> getFilters() {
List<LeadFilterDTO> filtersList = new ArrayList<>();
LeadFilterDTO campaignFilter = new LeadFilterDTO();
campaignFilter.setColumn(CAMPAIGN_ID);
campaignFilter.setValue(Long.toString(campaignId));
campaignFilter.setSqlType(BIGINTEGER);
filtersList.add(campaignFilter);
LeadFilterDTO partnerFilter = new LeadFilterDTO();
partnerFilter.setColumn(PARTNER_ID);
partnerFilter.setValue(Long.toString(partnerId));
partnerFilter.setSqlType(BIGINTEGER);
filtersList.add(partnerFilter);
LeadFilterDTO statusFilter = new LeadFilterDTO();
statusFilter.setColumn(STATUS);
statusFilter.setValue("VALID");
statusFilter.setSqlType(CHAR);
filtersList.add(statusFilter);
return filtersList;
}
@Override
protected void doJumpToPage(int itemIndex) {
}
}
如果框架内的组件(特别是ItemReader
、ItemProcessor
、ItemWriter
、或Tasklet
)引发异常,且未被捕获,则执行该组件的步骤将被标记为失败,而无需执行任何其他操作。如果一个步骤失败,作业也会被标记为失败(这就是允许重新启动作业的原因)。
简而言之,当抛出异常时,您不需要做任何额外的事情来让作业失败。
我在我的JAVA应用程序中配置了Spring批处理作业,该应用程序在集群中运行。因此,相同的作业被执行两次,这是我不想要的。 所以我想在作业中配置一个步骤,它将检查CREATE_DATE是否在BATCH_JOB_EXECUTION表中存在,并将继续或故障转移。 如何在spring批处理步骤中进行配置?
是否可以在Spring批处理中动态配置作业? 这是我想做的。我创建了几个不同的,如下所示: FlatFileItemReader 我希望能够在创建批处理作业时动态混合和匹配它们。例如,假设我需要一个有2个步骤的作业。第一步包含一个用于预处理的。第二步将有一个,用于使用我的阅读器/写入器进行基于块的数据处理......类似这样的东西: 在XML中,我可以执行以下操作: 但是我如何像上面一样以编程方式
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
我最近使用。我对DB表进行了必要的更改,并对一些与参数API相关的微小代码进行了更改。 现在,当我运行应用程序时,它正在工作,但是如果一个步骤的退出状态为失败,则作业的存在状态设置为完成。这会导致一些问题,因为我们的应用程序代码将其视为成功执行。我通过在中添加一个代码片段来解决这个问题,在这里我检查列表并手动设置作业退出状态,但是Spring批处理框架不应该处理退出状态吗?
我有一个作业,它有一个并行执行的块步骤(8个分区): 阅读器:jdbcCursorItemReader 处理器:使用jdbcTemplate调用数据库(每个分区1个线程) Writer:写入文件 我使用一个JdbcCursorItemReader从共享的Postgres数据库(V9.2)读取数百万数据。(其他用户同时使用数据库) 谢谢你的帮助
当应用程序在单个实例中运行时也是如此。 有人可以帮助如果有任何额外的步骤,我必须介绍更新步骤和工作完成?