我面临一个挑战,需要从SQL Server数据库中读取“未处理”的数据,处理数据,然后有选择地更新DB2数据库中的两到六个表,然后将该数据标记为在SQL Server上的原始数据库中已处理。在任何时候,如果出现任何故障,我希望所有更新都回滚。如果我有10个未处理的项目,9个良好,但有一个失败,我仍然希望9个良好的项目完成,第10个恢复到原始状态,直到我们可以研究问题并进行更正。
总体架构是,一个输入实例可能导致插入至少3个DB2表和多达7个表。一些DB2表最终可能会从一个输入中插入多个内容。我必须为每个表更新开发一个不同的编写器,并找出如何将该表所需的特定数据传递给每个编写器。我还需要利用2个数据源分别更新DB2和SQL Server。
我不是一个有经验的Spring Batch开发人员。而且我很少有可以“读取1、处理1、写入1”并重复的项目。通常我需要读取多个文件/数据库,处理该数据,然后写入一个或多个报告、文件和/或数据库。我看到哪里为此类应用程序提供了支持,但它更复杂,需要更多研究,可以找到的示例有限。
在我尝试实施解决方案的过程中,我走了一条轻松的道路。我开发了一个实现Tasklet的类,并按照实时进程的工作方式编写代码。它使用JDBCTemplate从SQL中获取输入数据,然后将数据传递给处理数据并确定需要更新的内容的代码。我有一个事务管理器类,它实现了@Transactional with REQUIRES\u NEW和rollback,用于自定义未检查的异常。事务类捕获所有DataAccessException事件,并将引发自定义异常。目前,我只使用DB2数据源,以免使情况过于复杂。
在我的测试中,我在更新过程的末尾添加了代码,这会引发未经检查的异常。我希望更新能够回滚。但这并没有发生。如果重新运行该进程,DB2上会出现803个错误。
最后一件事。在我们的商店中,我们需要在DB2上使用存储过程来进行所有访问。所以我使用SimpleJdbcCall来执行SP。
这是我的代码:
Tasklet的主要java类:
public class SynchronizeDB2WithSQL implements Tasklet
{
private static final BatchLogger logger = BatchLogger.getLogger();
private Db2UpdateTranManager tranMgr;
public void setTranMgr(Db2UpdateTranManager tranMgr) {
this.tranMgr = tranMgr;
}
private AccessPaymentIntegrationDAO pmtIntDAO;
public void setPmtIntDAO(AccessPaymentIntegrationDAO pmtIntDAO) {
this.pmtIntDAO = pmtIntDAO;
}
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
logger.logInfoMessage("=============================================");
logger.logInfoMessage(" EB0255IA - Synchronize DB2 with SQL");
logger.logInfoMessage("=============================================");
List<UnprocessedPaymentDataBean> orderList = this.pmtIntDAO.fetchUnprocessedEntries();
if(CollectionUtils.isNotEmpty(orderList)) {
for(UnprocessedPaymentDataBean ent: orderList) {
logger.logDebugMessage(" Processing payment ");
logger.logDebugMessage(ent.toString());
Map<String, List<PaymentTransactionDetailsBean>> paymentDetails = arrangePayments(this.pmtIntDAO.getDetailsByOrder(ent.getOrderNbr()));
try {
this.tranMgr.createNewAuthorizedPayment(ent, paymentDetails);
} catch (DataException e) {
logger.logErrorMessage("Encountered a Data Exception: "+e);
}
}
} else {
logger.logInfoMessage("=============================================");
logger.logInfoMessage("No data was encountered that needed to be processed");
logger.logInfoMessage("=============================================");
}
return RepeatStatus.FINISHED;
}
和Spring批处理xml:
<job id="EB0255IA" parent="baseJob" job-repository="jobRepository"
xmlns="http://www.springframework.org/schema/batch" restartable="true"
incrementer="parameterIncrementer">
<description>Job to maintain the DB2 updates for payment activity</description>
<step id="SynchronizeDB2WithSQL">
<tasklet ref="synchronizeTasklet" />
</step>
</job>
<bean id="synchronizeTasklet" class="com.ins.pmtint.synchdb2.SynchronizeDB2WithSQL" >
<property name="pmtIntDAO" ref="pmtIntDAO" />
<property name="tranMgr" ref="db2TranMgr" />
</bean>
<bean id="jdbcUpdateDB2" class="com.ins.pmtint.db.JDBCUpdateDB2">
<property name="dataSource" ref="dataSourceBnkDB2" />
</bean>
<bean id="updateDB2DataDAO" class="com.ins.pmtint.db.dao.UpdateDB2DataDAOImpl">
<property name="jdbcUpdateDB2" ref="jdbcUpdateDB2" />
</bean>
<bean id="db2TranMgr" class="com.ins.pmtint.db.tranmgr.Db2UpdateTranManagerImpl">
<property name="updateDB2DataDAO" ref="updateDB2DataDAO" />
</bean>
<bean id="jdbcPaymentIntegration" class="com.ins.pmtint.db.JDBCPaymentIntegration" >
<property name="dataSource" ref="dataSourcePmtIntegration" />
</bean>
<bean id="pmtIntDAO" class="com.ins.pmtint.db.dao.AccessPaymentIntegrationDAOImpl">
<property name="jdbcPaymentIntegration" ref="jdbcPaymentIntegration" />
</bean>
事务管理器实现的一部分。
public class Db2UpdateTranManagerImpl implements Db2UpdateTranManager, DB2FieldNames {
private static final BatchLogger logger = BatchLogger.getLogger();
UpdateDB2DataDAO updateDB2DataDAO;
public void setUpdateDB2DataDAO(UpdateDB2DataDAO updateDB2DataDAO) {
this.updateDB2DataDAO = updateDB2DataDAO;
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public void createNewAuthorizedPayment(UnprocessedPaymentDataBean dataBean, Map<String, List<PaymentTransactionDetailsBean>> paymentDetails) {
logger.logDebugMessage("At Db2UpdateTranManagerImpl.createNewAuthorizedPayment(");
logger.logDebugMessage(dataBean.toString());
String orderNbr = String.valueOf(dataBean.getOrderNbr());
String eventCode = TranTypeCode.fromValue(dataBean.getTransactionTypeCode()).getDB2Event();
if(eventCode == null) {
try {
KFBDistBatchEMail.createAndSendMessage("There is no event code for current entry\n\nOrder: "+orderNbr+" Tran type: "+dataBean.getTransactionTypeCode(), "EB0255IA - Database error" ,EnhancedPropertyPlaceholderConfigurer.getEmailFrom(), EnhancedPropertyPlaceholderConfigurer.getEmailTo(), null);
throw new DataException("Update failed: No event code to apply");
} catch (EMailExcpetion e2) {
logger.logErrorMessage("Generating email", e2);
}
}
String orginatingSystemId;
if (dataBean.getPaymentTypeCode().equalsIgnoreCase("EFT"))
orginatingSystemId = "FS";
else
orginatingSystemId = "IN";
try {
if(dataBean.getTransactionTypeCode().equalsIgnoreCase("A")) {
this.updateDB2DataDAO.updatePaymentDetails(orderNbr, DB_INITIAL_EVENT_CODE, "", dataBean.getTransactionAmt(), orginatingSystemId);
}
**** FOR TESTING - AT THE END I HAVE ADDED ****
throw new DataException("I finished processing and backed out. \n\n"+dataBean);
}
这是JDBC代码的一部分:
public class JDBCUpdateDB2 extends JdbcDaoSupport
implements DB2FieldNames
{
private static final BatchLogger logger = KFBBatchLogger.getLogger();
public void updatePaymentDetails(String orderNbr, String eventCd, String authnbr, Double amount, String orginatingSystemId) {
SimpleJdbcCall jdbcCall = new SimpleJdbcCall(getDataSource()).withSchemaName(EnhancedPropertyPlaceholderConfigurer.getDB2Schema()).withProcedureName(UPDATE_PAYMENT_TRANSACTION_DB2_PROC);
MapSqlParameterSource sqlIn = new MapSqlParameterSource();
sqlIn.addValue(SP_BNKCRD_PMT_ORD_NBR, orderNbr);
sqlIn.addValue(SP_CLUSTERING_NBR_2, new StringBuilder(orderNbr.substring(Math.max(orderNbr.length() - 2, 0))).reverse().toString());
sqlIn.addValue(SP_BNKCRD_EVNT_CD, eventCd);
sqlIn.addValue(SP_CCTRAN_ERR_CD, "N");
sqlIn.addValue(SP_BNKCRD_PROC_RET_CD, "");
sqlIn.addValue(SP_BNKCRD_AUTH_CD, "G");
sqlIn.addValue(SP_ORIG_SYS_ID_TXT, orginatingSystemId);
sqlIn.addValue(SP_BNKCRD_TRAN_AMT, amount);
try {
jdbcCall.execute(sqlIn);
} catch (DataAccessException e) {
logger.logErrorMessage("Database error in updatePaymentDetails", e);
throw e;
}
}
在我的研究中,我发现了ChainedTransactionManager类。我在spring配置中html" target="_blank">实例化了它,并使用以下方法将其添加到我的应用程序中:
<tx:annotation-driven proxy-target-class="true" transaction-manager="transactionManager" />
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
<constructor-arg>
<list>
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="SqlServerDataSource" />
</bean>
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="DB2DataSource" />
</bean>
</list>
然后在代码中我添加了事务注释。
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public int createOrder(PaymentTransactionBean paymentTransaction) {
logger.logDebugMessage("PmtIntTransactionManager.createOrder");
int orderNbr = -1;
try {
orderNbr = this.pmtIntSqlDao.createPaymentTransaction(paymentTransaction);
} catch (DataAccessException e) {
logger.logDebugMessage(LogHelper.LOG_SEPARATOR_LINE);
logger.logDebugMessage("Caught a DataAccessException", e);
PmtUtility.notifySysUser("Database error", "A database error was encountered and rolled back", e);
throw new DataException("Update failed", e.getCause());
}
return orderNbr;
}
在此级别以下执行的任何代码都可以引发自定义DataException,该DataException扩展RunTimeException,并且所有已执行的SQL更新都将回滚。当控件退出CreateOrder方法时,该代码中发生的任何更新都将自动提交。
我在测试中发现的一件事是,我无法捕获DataException,然后重新抛出它并期望回滚。我这样做的目的是生成日志条目。最后,我不得不抛出一个已检查的异常,创建日志条目,然后通过DataException启动回滚。
由于您需要写入多个表,因此您可以使用CompositeItemWriter
为每个表设置一个委托项写入器。在这种情况下,委托应在步骤中注册为流。您还可以创建一个单项写入器,该写入器向不同的表发出3个(或更多)插入语句(但我不建议这样做)。
如果我有10个未处理的项目和9个是好的但一个失败我仍然希望9个好的完成和第十个返回到它的原始状态
如果使用容错步骤,并且在写入区块期间引发可跳过的异常,Spring Batch将扫描区块以查找错误项(因为它无法知道是哪个项导致了错误)。从技术上讲,Spring Batch将区块大小设置为1,并对每个项目使用一个事务,因此仅回滚错误的项目。这允许您实现上述要求。下面是一个自包含的示例,向您展示其工作原理:
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ChunkScanningTest.JobConfiguration.class)
public class ChunkScanningTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
}
@Test
public void testChunkScanningWhenSkippableExceptionInWrite() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
Assert.assertEquals(0, peopleCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int bazCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 3 and name = 'baz'");
Assert.assertEquals(1, fooCount); // foo is inserted
Assert.assertEquals(1, bazCount); // baz is inserted
Assert.assertEquals(2, peopleCount); // bar is not inserted
Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(3, stepExecution.getCommitCount()); // one commit for foo + one commit for baz + one commit for the last (empty) chunk
Assert.assertEquals(2, stepExecution.getRollbackCount()); // initial rollback for whole chunk + one rollback for bar
Assert.assertEquals(2, stepExecution.getWriteCount()); // only foo and baz have been written
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
Person baz = new Person(3, "baz");
return new ListItemReader<>(Arrays.asList(foo, bar, baz));
}
@Bean
public ItemWriter<Person> itemWriter() {
return new PersonItemWriter(dataSource());
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory.get("step")
.<Person, Person>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.skip(IllegalStateException.class)
.skipLimit(10)
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class PersonItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
PersonItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
System.out.println("Writing items: "); items.forEach(System.out::println);
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
System.out.println("Throwing exception: No bars here!");
throw new IllegalStateException("No bars here!");
}
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
此示例打印:
Writing items:
Person{id=1, name='foo'}
Person{id=2, name='bar'}
Person{id=3, name='baz'}
Throwing exception: No bars here!
Writing items:
Person{id=1, name='foo'}
Writing items:
Person{id=2, name='bar'}
Throwing exception: No bars here!
Writing items:
Person{id=3, name='baz'}
如您所见,在抛出可跳过项之后,每个块只包含一个项(Spring Batch正在逐个扫描项以确定错误项),并且只写入有效项。
只有有限的例子可以找到
我希望这个示例能够清楚地说明这个特性。如果您想以复合项编写器为例,请查看以下问题/答案:Spring Batch CompositeItemWriter如何管理委托编写器的事务?
希望这有帮助。
该步骤的writer的逻辑非常简单:它尝试从数据库中读取一行,一旦找到该行,就更新该行。我能够通过在find方法之后设置断点来重现,手动为数据库中的行颠簸列并提交它,然后恢复。 但是,在我的步骤中取消注释重试定义后,没有尝试重试。经过一些调试后,Spring重试逻辑似乎在块的事务中;但是由于不是由编写器中的代码引发的,而是由Spring的块事务提交逻辑引发的,因此根本没有尝试重试。 当我试图在编写
我在Spring Boot应用程序中有一个Javers实现。Mongo4.4被用作数据库。从MongoDB4.4开始,您可以在事务中创建文档。 我在创建对象时模拟了一个异常。如预期的那样,对象没有在数据库中创建,但是一个新的快照被添加到jv_snapshots集合中。 控制器:
public void A()抛出ApplicationException{ } 这是方法B(): } 显然,如果删除方法B()中的catch块,就不会出现这种行为。现在,我想知道是否有一种方法可以回滚我的事务,即使我捕捉到方法B()中的异常。谢谢!!!!
在我的一般问题之后,我有一个使用Spring的特定问题,我想在下面每次执行DAO方法后回滚特定的测试方法。 添加和未能回滚插入 此外,在之前/之后获取连接并回滚也不会产生影响 我应该如何使用TestNG框架回滚单元测试?大多数答案使用JUnit的 我未能使用TestNG自动接线: 但是成功地使用了和include配置类includes jdbcTemplate/DataStource TestNG
我们有一个Spring事务回滚问题,其中回滚似乎不起作用 在用注释的服务层方法中,我调用三个不同的类来插入3条记录 中间插入从第四个表执行get以填充描述字段,但此get失败。我希望第一次插入会回滚,但它似乎没有发生 几点: 获取方法抛出运行时异常 我们使用和中定义的。Bean是在中创建的,它被导入到 在层 中没有 注释 我们已经使用了