我的源表中有10条记录,并且项目计数为3。
我有2个分区来处理这10条记录(即,前5个记录将在第一个分区中处理,其余记录将在第2个分区中处理,而在第2个分区中处理记录时,我抛出异常,因此作业将在第2个分区的第2个块上失败。当我重新启动作业时,失败的分区将再次处理所有记录(即第一个块和第二个块)。重新启动作业应仅从最后一个失败的块记录开始处理,而不是该分区中的所有记录。请您指导我如何做到这一点?
我的JSL如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
id="readingfrom-db" restartable="true" version="1.0" >
<properties >
<property name="numRec" value="#{jobParameters['numRec']}?:5;"/>
<property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
<property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
<property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>
<property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
<property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
<property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
<property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
</properties>
<step id="runcache" next="readFromDB">
<batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
</step>
<step id="readFromDB">
<listeners>
<listener ref="com.cdc.dbreader.LogExceptionListener"/>
</listeners>
<chunk item-count="3" checkpoint-policy="item">
<reader ref="com.cdc.dbreader.DBItemReader">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
<property name="tableName" value="#{jobProperties['tableName']}"/>
<property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>
</properties>
</reader>
<processor ref="com.cdc.dbreader.DBItemProcessor" />
<writer ref="com.cdc.dbreader.DBItemWriter">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
<property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
</properties>
</writer>
</chunk>
<partition>
<plan partitions="2" threads="2">
<properties partition="0">
<property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>
</properties>
<properties partition="1">
<property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
</properties>
</plan>
</partition>
</step>
</job>
我的物品阅读器如下所示:
public class DBItemReader implements ItemReader {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String whereclauseFrom;
@Inject
@BatchProperty
private String tableName;
private Connection conn =null;
private int totalRecords=0;
private DataSource ds = null;
List<RecObj> listRecObj=new ArrayList<RecObj>();
@Override
public Object readItem() throws SQLException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
iter.remove();
return rec;
}
return rec;
}
@Override
public void open(Serializable arg0) throws NamingException, SQLException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
// System.out.println("whereclauseFrom: " + whereclauseFrom);
conn = ds.getConnection();
String sql ="";
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ whereclauseFrom;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ whereclauseFrom;
}
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs=ps.executeQuery();
while(rs.next()){
totalRecords++;
String rec=rs.getString("REC");
if(rec != null)
listRecObj.add(new RecObj(rec));
}
rs.close();
}
@Override
public void close() throws SQLException {
conn.close();
}
@Override
public Serializable checkpointInfo() {
return null;
}
}
}
我的作家类如下:
public class DBItemWriter extends AbstractItemWriter implements ItemWriter {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String tableName;
private DataSource ds = null;
@Override
public void open(Serializable arg0) throws NamingException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
}
@Override
public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{
Connection conn = ds.getConnection();
String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";
PreparedStatement ps = conn.prepareStatement(sql);
for (Object obj : items) {
RecObj v = (RecObj)obj;
System.out.println("=======Writer values===="+v.getRec());
ps.setString(1, v.getRec());
ps.addBatch();
}
ps.executeBatch();
ps.clearBatch();
ps.close();
conn.close();
}
}
以下是我的处理器:
public class DBItemProcessor implements ItemProcessor {
Integer count=0;
@Override
public Object processItem(Object arg0) {
count++;
RecObj v=(RecObj)arg0;
String vname=v.getRec();
System.out.println("=========Processer Values==="+vname);
return new RecObj(vname+count);
}
}
下面是我的Bean类
public class RecObj {
private String rec;
public RecObj(String rec) {
this.rec=rec;
}
您需要在阅读器中返回一个检查点值,该值 checkpointInfo()
将 open()
在重新启动时传递到阅读器的方法中。阅读器和批处理容器通过这种方式协调以在重新启动时提供检查点。
因此,您可能会遇到类似(查找 CHECKPOINT 注释)的情况:
public class DBItemReader implements ItemReader {
// ...
// CHECKPOINT field defined
private String checkpoint = null;
@Override
public void open(Serializable checkpoint) throws NamingException, SQLException {
// CHECKPOINT-based positioning through query value.
// Initial position = whereclauseFrom, on restart set to checkpoint
String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ queryVal;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ queryVal;
}
// ..
}
@Override
public Object readItem() throws SQLException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
// CHECKPOINT updated
checkpoint = rec.getRec();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
}
}
// ...
}
@Override
public Serializable checkpointInfo() {
// CHECKPOINT returned at end of chunk
return checkpoint;
}
}
我有一个Spring批处理程序,它从数据库中读取数据并处理它,并将(使用ItemWriter)插入到数据库中的其他表中。在这里,我使用了一系列针对ItemReader、Item处理机和ItemWriter的SQL查询。 我的要求是将所有这些查询存储在一个具有参数和值格式的表中,并通过一个数据库调用来检索它,然后将其传递给ItemReader或Item处理机或ItemrWriter。因此,如果将来查
问题内容: 我正在将SQLAlchemy用作python项目的ORM。我创建了很少的模型/架构,并且工作正常。现在,我需要查询现有的MySQL数据库,而不仅仅是select语句,不能插入/更新。 如何围绕该现有数据库的表创建包装器?我已经简短地阅读了sqlalchemy文档和SO,但是找不到任何相关的内容。所有人都建议执行方法,在这里我需要编写原始sql查询,而我想以与使用SA模型相同的方式使用S
我有一个firebase数据库,员工数据的结构如下。我还有一个用户表,它具有'Company ID'属性。每次用户登录时,我都希望获取与用户“公司ID”匹配的所有员工。是否有可能实现这使用firebase数据库规则或我必须写一些查询。
问题内容: 是否有任何工具可以检查asp.net或sql server并报告针对数据库运行的所有查询?我问的原因是我正在使用Linq进行项目,并想仔细检查其对每个页面的实际作用。 理想情况下,我想在浏览器中查看页面,并拥有创建该页面所运行的所有查询的报告。 我知道我可以使用调试/断点查看运行在单个查询中的SQL,并且我了解LinqPad,但是恐怕Linq会自己再进行几个查询,以获得我可能不直接知道
问题内容: 这有点像“鸡还是蛋”查询,但是有人可以梦想一个查询,该查询可以返回执行查询的当前数据库实例的名称吗?当我说我理解这个悖论时,请相信我:如果您已经连接执行查询,为什么需要知道数据库实例的名称?在多数据库环境中进行审核。 我已经看过联机丛书中的所有接近,但是我想要数据库实例而不是服务器的名称。 问题答案: 返回数据库名称。