public class DocumentItemReader implements ItemReader<Document> {
public List<Document> documents = new ArrayList<>();
@Override
public Document read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(documents.isEmpty()) {
getDocuments(); // This method retrieve 100 documents and store them in "documents" list.
if(documents.isEmpty()) return null;
}
Document doc = documents.get(0);
documents.remove(0);
return doc;
}
}
但是,如果我想使用几个线程,问题就出现了。在本例中,我开始使用分区器方法而不是多线程。这样做的原因是因为我从同一个数据库读取,所以如果我用几个线程重复整个步骤,所有线程都将找到相同的记录,我不能使用分页(见下文)。
另一个问题是数据库记录是动态更新的,所以我不能使用分页。例如,让我们假设我有200条记录,并且所有记录都将很快过期,因此流程将检索它们。现在,假设我用一个线程检索了10个,在其他任何事情之前,这个线程处理了一个并在同一个数据库中更新它。下一个线程不能检索11到20条记录,因为第一条记录不会出现在搜索中(因为它已经被处理,它的日期已经更新,然后它与查询不匹配)。
有点难以理解,有些事情听起来可能很奇怪,但在我的项目中:
谢谢你。
也许您可以在您的步骤中添加一个分区器,该分区器将:
那么您的阅读器将不再从数据库中读取,而是从分区文件中读取。
public class JdbcToFilePartitioner implements Partitioner {
/** number of records by database fetch */
private int fetchSize = 100;
/** working directory */
private File tmpDir;
/** limit the number of item to select */
private Long nbItemMax;
@Override
public Map<String, ExecutionContext> partition(final int gridSize) {
// Create contexts for each parttion
Map<String, ExecutionContext> executionsContexte = createExecutionsContext(gridSize);
// Fill partition with ids to handle
getIdsAndFillPartitionFiles(executionsContexte);
return executionsContexte;
}
/**
* @param gridSize number of partitions
* @return map of execution context, one for each partition
*/
private Map<String, ExecutionContext> createExecutionsContext(final int gridSize) {
final Map<String, ExecutionContext> map = new HashMap<>();
for (int partitionId = 0; partitionId < gridSize; partitionId++) {
map.put(String.valueOf(partitionId), createContext(partitionId));
}
return map;
}
/**
* @param partitionId id of the partition to create context
* @return created executionContext
*/
private ExecutionContext createContext(final int partitionId) {
final ExecutionContext context = new ExecutionContext();
String fileName = tmpDir + File.separator + "partition_" + partitionId + ".txt";
context.put(PartitionerConstantes.ID_GRID.getCode(), partitionId);
context.put(PartitionerConstantes.FILE_NAME.getCode(), fileName);
if (contextParameters != null) {
for (Entry<String, Object> entry : contextParameters.entrySet()) {
context.put(entry.getKey(), entry.getValue());
}
}
return context;
}
private void getIdsAndFillPartitionFiles(final Map<String, ExecutionContext> executionsContexte) {
List<BufferedWriter> fileWriters = new ArrayList<>();
try {
// BufferedWriter for each partition
for (int i = 0; i < executionsContexte.size(); i++) {
BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(executionsContexte.get(String.valueOf(i)).getString(
PartitionerConstantes.FILE_NAME.getCode())));
fileWriters.add(bufferedWriter);
}
// Fetching the datas
ScrollableResults results = runQuery();
// Get the result and fill the files
int currentPartition = 0;
int nbWriting = 0;
while (results.next()) {
fileWriters.get(currentPartition).write(results.get(0).toString());
fileWriters.get(currentPartition).newLine();
currentPartition++;
nbWriting++;
// If we already write on all partitions, we start again
if (currentPartition >= executionsContexte.size()) {
currentPartition = 0;
}
// If we reach the max item to read we stop
if (nbItemMax != null && nbItemMax != 0 && nbWriting >= nbItemMax) {
break;
}
}
// closing
results.close();
session.close();
for (BufferedWriter bufferedWriter : fileWriters) {
bufferedWriter.close();
}
} catch (IOException | SQLException e) {
throw new UnexpectedJobExecutionException("Error writing partition file", e);
}
}
private ScrollableResults runQuery() {
...
}
}
如果新的分区数据被添加到HDFS(没有alter table添加分区命令执行)。然后,我们可以通过执行命令'MSCK修复‘来同步元数据。 如果从HDFS中删除了许多分区数据,该怎么办(不执行alter table drop partition commad执行)。 如何同步配置单元元数据?
我正在使用dataflow处理存储在GCS中的文件,并写入Bigquery表。以下是我的要求: 输入文件包含events记录,每个记录属于一个EventType; 需要按EventType对记录进行分区; 对于每个eventType输出/写入记录到相应的Bigquery表,每个eventType一个表。 每个批处理输入文件中的事件各不相同; 我正在考虑应用诸如“GroupByKey”和“Parti
如何根据列中项数的计数来分区DataFrame。假设我们有一个包含100人的DataFrame(列是和),我们希望为一个国家中的每10个人创建一个分区。 如果我们的数据集包含来自中国的80人,来自法国的15人,来自古巴的5人,那么我们需要8个分区用于中国,2个分区用于法国,1个分区用于古巴。 下面是无法工作的代码: null 有什么方法可以动态设置每个列的分区数吗?这将使创建分区数据集变得更加容易
用例:步骤1:ItemReader:从数据库中读取1000个ItemProcessor块中的数据:处理这些数据。ItemWriter:将数据写入地图,以便下一步使用 步骤2:ItemReader:读取地图ItemProcessor:处理地图数据并获取新对象。ItemWriter:将新的进程对象持久化到数据库中。 现在我希望Map在整个作业中保持不变,目前我已经为Map创建了一个不同的POJO类,并
作为卡桑德拉数据分区的后续,我得到了vNodes的想法。感谢“西蒙·丰塔纳·奥斯卡森” 当我尝试使用vNodes进行数据分区时,我有几个问题, 我尝试观察2节点中的分区分布() 因此,根据我在两个节点中的观察,随着一个范围的扩展,节点61的值从-9207297847862311651到-9185516104965672922。。。 注意:分区范围从9039572936575206977到90199
鉴于以下Mockito语句: 假设mock.method()语句将返回值传递给when(),Mockito如何为mock创建代理?我想这使用了一些CGLib的东西,但我想知道这是如何在技术上完成的。
我目前正在开发一个Android应用程序,我使用Firebase(实时功能)作为后端服务。此外,我开发了这个功能,如下所示。 扩展代码工作得很好,它检查现有数据并显示Toast消息,但在数据仍进入数据库后,我想消除重复数据。
当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?