当前位置: 首页 > 知识库问答 >
问题:

当Spring Batch中的数据是动态的时,如何用分区器重复作业?

涂羽
2023-03-14
public class DocumentItemReader implements ItemReader<Document> {

    public List<Document> documents = new ArrayList<>();

    @Override
    public Document read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        if(documents.isEmpty()) {
            getDocuments(); // This method retrieve 100 documents and store them in "documents" list.
            if(documents.isEmpty()) return null;
        }

        Document doc = documents.get(0);
        documents.remove(0);
        return doc;
    }
}

但是,如果我想使用几个线程,问题就出现了。在本例中,我开始使用分区器方法而不是多线程。这样做的原因是因为我从同一个数据库读取,所以如果我用几个线程重复整个步骤,所有线程都将找到相同的记录,我不能使用分页(见下文)。

另一个问题是数据库记录是动态更新的,所以我不能使用分页。例如,让我们假设我有200条记录,并且所有记录都将很快过期,因此流程将检索它们。现在,假设我用一个线程检索了10个,在其他任何事情之前,这个线程处理了一个并在同一个数据库中更新它。下一个线程不能检索11到20条记录,因为第一条记录不会出现在搜索中(因为它已经被处理,它的日期已经更新,然后它与查询不匹配)。

有点难以理解,有些事情听起来可能很奇怪,但在我的项目中:

    null

谢谢你。

共有1个答案

东郭宏深
2023-03-14

也许您可以在您的步骤中添加一个分区器,该分区器将:

  1. 选择需要更新的数据的所有ID(如果需要,请选择其他列)
  2. 将它们拆分到x个(x=gridSize参数)分区中,并将它们写入临时文件(按分区1)。
  3. 在ExecutionContext中注册要读取的文件名

那么您的阅读器将不再从数据库中读取,而是从分区文件中读取。

public class JdbcToFilePartitioner implements Partitioner {

    /** number of records by database fetch  */
    private int fetchSize = 100;

    /** working directory */
    private File tmpDir;

    /** limit the number of item to select */
    private Long nbItemMax;

    @Override
    public Map<String, ExecutionContext> partition(final int gridSize) {

        // Create contexts for each parttion
        Map<String, ExecutionContext> executionsContexte = createExecutionsContext(gridSize);

        // Fill partition with ids to handle
        getIdsAndFillPartitionFiles(executionsContexte);

        return executionsContexte;
    }

    /**
     * @param gridSize number of partitions
     * @return map of execution context, one for each partition
     */
    private Map<String, ExecutionContext> createExecutionsContext(final int gridSize) {

        final Map<String, ExecutionContext> map = new HashMap<>();

        for (int partitionId = 0; partitionId < gridSize; partitionId++) {
            map.put(String.valueOf(partitionId), createContext(partitionId));
        }

        return map;
    }

    /**
     * @param partitionId id of the partition to create context
     * @return created executionContext
     */
    private ExecutionContext createContext(final int partitionId) {

        final ExecutionContext context = new ExecutionContext();

        String fileName = tmpDir + File.separator + "partition_" + partitionId + ".txt";

        context.put(PartitionerConstantes.ID_GRID.getCode(), partitionId);
        context.put(PartitionerConstantes.FILE_NAME.getCode(), fileName);

        if (contextParameters != null) {
            for (Entry<String, Object> entry : contextParameters.entrySet()) {
                context.put(entry.getKey(), entry.getValue());
            }
        }

        return context;
    }

    private void getIdsAndFillPartitionFiles(final Map<String, ExecutionContext> executionsContexte) {

        List<BufferedWriter> fileWriters = new ArrayList<>();
        try {

            // BufferedWriter for each partition
            for (int i = 0; i < executionsContexte.size(); i++) {
                BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(executionsContexte.get(String.valueOf(i)).getString(
                        PartitionerConstantes.FILE_NAME.getCode())));
                fileWriters.add(bufferedWriter);
            }

            // Fetching the datas
            ScrollableResults results = runQuery();

            // Get the result and fill the files
            int currentPartition = 0;
            int nbWriting = 0;
            while (results.next()) {
                fileWriters.get(currentPartition).write(results.get(0).toString());
                fileWriters.get(currentPartition).newLine();
                currentPartition++;
                nbWriting++;

                // If we already write on all partitions, we start again
                if (currentPartition >= executionsContexte.size()) {
                    currentPartition = 0;
                }

                // If we reach the max item to read we stop
                if (nbItemMax != null && nbItemMax != 0 && nbWriting >= nbItemMax) {
                    break;
                }
            }

            // closing
            results.close();
            session.close();
            for (BufferedWriter bufferedWriter : fileWriters) {
                bufferedWriter.close();
            }
        } catch (IOException | SQLException e) {
            throw new UnexpectedJobExecutionException("Error writing partition file", e);
        }
    }

    private ScrollableResults runQuery() {
        ...
    }
}
 类似资料:
  • 如果新的分区数据被添加到HDFS(没有alter table添加分区命令执行)。然后,我们可以通过执行命令'MSCK修复‘来同步元数据。 如果从HDFS中删除了许多分区数据,该怎么办(不执行alter table drop partition commad执行)。 如何同步配置单元元数据?

  • 我正在使用dataflow处理存储在GCS中的文件,并写入Bigquery表。以下是我的要求: 输入文件包含events记录,每个记录属于一个EventType; 需要按EventType对记录进行分区; 对于每个eventType输出/写入记录到相应的Bigquery表,每个eventType一个表。 每个批处理输入文件中的事件各不相同; 我正在考虑应用诸如“GroupByKey”和“Parti

  • 如何根据列中项数的计数来分区DataFrame。假设我们有一个包含100人的DataFrame(列是和),我们希望为一个国家中的每10个人创建一个分区。 如果我们的数据集包含来自中国的80人,来自法国的15人,来自古巴的5人,那么我们需要8个分区用于中国,2个分区用于法国,1个分区用于古巴。 下面是无法工作的代码: null 有什么方法可以动态设置每个列的分区数吗?这将使创建分区数据集变得更加容易

  • 用例:步骤1:ItemReader:从数据库中读取1000个ItemProcessor块中的数据:处理这些数据。ItemWriter:将数据写入地图,以便下一步使用 步骤2:ItemReader:读取地图ItemProcessor:处理地图数据并获取新对象。ItemWriter:将新的进程对象持久化到数据库中。 现在我希望Map在整个作业中保持不变,目前我已经为Map创建了一个不同的POJO类,并

  • 作为卡桑德拉数据分区的后续,我得到了vNodes的想法。感谢“西蒙·丰塔纳·奥斯卡森” 当我尝试使用vNodes进行数据分区时,我有几个问题, 我尝试观察2节点中的分区分布() 因此,根据我在两个节点中的观察,随着一个范围的扩展,节点61的值从-9207297847862311651到-9185516104965672922。。。 注意:分区范围从9039572936575206977到90199

  • 鉴于以下Mockito语句: 假设mock.method()语句将返回值传递给when(),Mockito如何为mock创建代理?我想这使用了一些CGLib的东西,但我想知道这是如何在技术上完成的。

  • 我目前正在开发一个Android应用程序,我使用Firebase(实时功能)作为后端服务。此外,我开发了这个功能,如下所示。 扩展代码工作得很好,它检查现有数据并显示Toast消息,但在数据仍进入数据库后,我想消除重复数据。

  • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?