当前位置: 首页 > 知识库问答 >
问题:

Spring批处理CompositeItemWriter如何为委托编写器管理事务?

松霖
2023-03-14

提前道谢!

共有1个答案

贺功
2023-03-14

我的第一个问题是CompositeItemWriter是否适合上面的要求?

是的,CompositeItemWriter是一种可行的方法。

如果是,则引出关于事务的第二个问题。例如,如果第一次更新成功,而第二次插入失败。是否会自动回滚第一个更新事务?否则,如何手动拉取同一事务中的两个更新?

好问题!是的,如果在第一个writer中更新成功,然后在第二个writer中insert失败,则所有语句都将自动回滚。您需要知道的是,事务是围绕着面向块的tasklet步骤的执行(因此围绕着复合项编写器的write方法)。因此,此方法中所有sql语句的执行(在委托编写器中执行)将是原子的。

为了说明这个用例,我编写了以下测试:

  • 给定具有两列idname的表people,其中只有一条记录:1,'foo'
  • 让我们假设一个作业读取两个记录(1,'foo'2,'bar')并尝试将foo更新为foo!!,然后在表中插入2,'bar'。这是通过具有两个项目编写器的CompositeItemWriterInsertItemWriter
  • 来完成的
  • 使用情况是UpdateItemWriter成功,但InsertItemWriter失败(引发异常)
  • 预期的结果是foo未更新为foo!!并且bar未插入到表中(由于insertitemwriter中的异常,这两条sql语句都回滚了)

下面是代码(它是自包含的,所以您可以尝试它,看看它是如何工作的,它使用了一个嵌入式的hsqldb数据库,它应该在您的类路径中):

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
    }

    @Test
    public void testTransactionRollbackWithCompositeWriter() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount);
        Assert.assertEquals(1, fooCount);
        Assert.assertEquals(0, barCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(0, stepExecution.getCommitCount());
        Assert.assertEquals(1, stepExecution.getRollbackCount());
        Assert.assertEquals(0, stepExecution.getWriteCount());

        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount); // bar is not inserted
        Assert.assertEquals(0, barCount); // bar is not inserted
        Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            return new ListItemReader<>(Arrays.asList(foo, bar));
        }

        @Bean
        public ItemWriter<Person> updateItemWriter() {
            return new UpdateItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> insertItemWriter() {
            return new InsertItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
            return compositeItemWriter;
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory
                            .get("step").<Person, Person>chunk(2)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class UpdateItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public UpdateItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("foo".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                }
            }
        }
    }

    public static class InsertItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public InsertItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                    throw new IllegalStateException("Something went wrong!");
                }
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

我的示例使用自定义项编写器,但这也应该适用于两个JDBCBatchItemWriter

希望这有帮助!

 类似资料:
  • 我正在扩展这个Spring批处理CompositeItemWriter如何为委托编写器管理事务?这里的问题: 在我的例子中,下面有一个,它将数据写入同一个数据库的多个表中,在写入数据之前,它通过实现各种业务规则来转换数据。在这里,一个记录可能满足不同的业务规则等,因此一个编写器可能获得比其他编写器更多的数据。 场景-假设第一个writer工作得很好,第二个writer生成异常,然后第三个和第四个w

  • 关于文章Spring批处理CompositeItemWriter如何管理委托编写器的事务?,对于复合项编写器事务管理,难道不应该将数据源包装在如下所示的事务管理器中吗?没有下面的bean定义,事务管理就无法与Oracle和Hikari CP一起工作。不确定帖子中提供的示例是如何工作的。请澄清

  • 我目前有以下spring批处理(v2.2.4)作业,它从单个数据源读取,然后创建三种不同的输出格式。我有三个步骤,每个步骤都有一个标准的读取器、处理器和写入器bean。我使用下面的括号来表示每个处理器或写入器使用的格式。本例中的每个处理器都向writer bean返回一个SqlParameterSource对象。 我不喜欢这样一个事实,即我要三次读取相同的数据,所以我打算在新的工作中使用复合处理器

  • 我有一个compositeItemWriter,它有2个代理编写器:1。HeaderWriter将一些字段从我的对象写入头表2。DetailWriter将文件写入详细表。 context.xml:

  • 在我的Spring boot和Spring batch应用程序中,我有这样一个步骤: 我的作家是一个空的,如下所示: 现在,在我的处理器中,我有: 问题:由于所有对象都传递给处理器,我可以在处理器本身中处理它们,而不是使用任何转换等,因为我的目的通过使用处理器来解决,这是一个好的做法吗?或者我必须使用作家/自定义作家来完成工作?

  • 我想了解Spring Batch是如何进行事务管理的。这不是一个技术问题,而是一个概念性的问题:Spring Batch使用什么方法?这种方法的后果是什么? 让我试着澄清一下这个问题。例如,在TaskletStep中,我看到步骤执行通常如下所示: 准备步骤元数据的几个JobRepository事务 每一块要处理的业务事务 更多JobRepository事务,用区块处理的结果更新步骤元数据 这似乎是