提前道谢!
我的第一个问题是CompositeItemWriter是否适合上面的要求?
是的,CompositeItemWriter
是一种可行的方法。
如果是,则引出关于事务的第二个问题。例如,如果第一次更新成功,而第二次插入失败。是否会自动回滚第一个更新事务?否则,如何手动拉取同一事务中的两个更新?
好问题!是的,如果在第一个writer中更新成功,然后在第二个writer中insert失败,则所有语句都将自动回滚。您需要知道的是,事务是围绕着面向块的tasklet步骤的执行(因此围绕着复合项编写器的write
方法)。因此,此方法中所有sql语句的执行(在委托编写器中执行)将是原子的。
为了说明这个用例,我编写了以下测试:
id
和name
的表people
,其中只有一条记录:1,'foo'
1,'foo'
、2,'bar'
)并尝试将foo
更新为foo!!
,然后在表中插入2,'bar'
。这是通过具有两个项目编写器的CompositeItemWriter
和InsertItemWriter
UpdateItemWriter
成功,但InsertItemWriter
失败(引发异常)foo
未更新为foo!!
并且bar
未插入到表中(由于insertitemwriter
中的异常,这两条sql语句都回滚了)下面是代码(它是自包含的,所以您可以尝试它,看看它是如何工作的,它使用了一个嵌入式的hsqldb数据库,它应该在您的类路径中):
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
}
@Test
public void testTransactionRollbackWithCompositeWriter() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount);
Assert.assertEquals(1, fooCount);
Assert.assertEquals(0, barCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(0, stepExecution.getCommitCount());
Assert.assertEquals(1, stepExecution.getRollbackCount());
Assert.assertEquals(0, stepExecution.getWriteCount());
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount); // bar is not inserted
Assert.assertEquals(0, barCount); // bar is not inserted
Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
return new ListItemReader<>(Arrays.asList(foo, bar));
}
@Bean
public ItemWriter<Person> updateItemWriter() {
return new UpdateItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> insertItemWriter() {
return new InsertItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> itemWriter() {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
return compositeItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory
.get("step").<Person, Person>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class UpdateItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public UpdateItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("foo".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
}
}
}
}
public static class InsertItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public InsertItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
throw new IllegalStateException("Something went wrong!");
}
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我的示例使用自定义项编写器,但这也应该适用于两个JDBCBatchItemWriter
。
希望这有帮助!
我正在扩展这个Spring批处理CompositeItemWriter如何为委托编写器管理事务?这里的问题: 在我的例子中,下面有一个,它将数据写入同一个数据库的多个表中,在写入数据之前,它通过实现各种业务规则来转换数据。在这里,一个记录可能满足不同的业务规则等,因此一个编写器可能获得比其他编写器更多的数据。 场景-假设第一个writer工作得很好,第二个writer生成异常,然后第三个和第四个w
关于文章Spring批处理CompositeItemWriter如何管理委托编写器的事务?,对于复合项编写器事务管理,难道不应该将数据源包装在如下所示的事务管理器中吗?没有下面的bean定义,事务管理就无法与Oracle和Hikari CP一起工作。不确定帖子中提供的示例是如何工作的。请澄清
我目前有以下spring批处理(v2.2.4)作业,它从单个数据源读取,然后创建三种不同的输出格式。我有三个步骤,每个步骤都有一个标准的读取器、处理器和写入器bean。我使用下面的括号来表示每个处理器或写入器使用的格式。本例中的每个处理器都向writer bean返回一个SqlParameterSource对象。 我不喜欢这样一个事实,即我要三次读取相同的数据,所以我打算在新的工作中使用复合处理器
我有一个compositeItemWriter,它有2个代理编写器:1。HeaderWriter将一些字段从我的对象写入头表2。DetailWriter将文件写入详细表。 context.xml:
在我的Spring boot和Spring batch应用程序中,我有这样一个步骤: 我的作家是一个空的,如下所示: 现在,在我的处理器中,我有: 问题:由于所有对象都传递给处理器,我可以在处理器本身中处理它们,而不是使用任何转换等,因为我的目的通过使用处理器来解决,这是一个好的做法吗?或者我必须使用作家/自定义作家来完成工作?
我想了解Spring Batch是如何进行事务管理的。这不是一个技术问题,而是一个概念性的问题:Spring Batch使用什么方法?这种方法的后果是什么? 让我试着澄清一下这个问题。例如,在TaskletStep中,我看到步骤执行通常如下所示: 准备步骤元数据的几个JobRepository事务 每一块要处理的业务事务 更多JobRepository事务,用区块处理的结果更新步骤元数据 这似乎是