我需要从数据库读取数据,并使用PIG分析数据。我用java编写了一个UDF,引用了下面的链接
register /tmp/UDFJars/CassandraUDF_1-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
A = Load '/user/sampleFile.txt' using udf.DBLoader('10.xx.xxx.4','username','password','select * from customer limit 10') as (f1 : chararray);
DUMP A;
package udf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import com.data.ConnectionCassandra;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
Session session;
private ArrayList mProtoTuple = null;
private String jdbcURL;
private String user;
private String pass;
private int count = 0;
private String query;
ResultSet result;
List<Row> rows;
int colSize;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String jdbcURL, String user, String pass, String query) {
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
log.info("Inside InputFormat");
// TODO Auto-generated method stub
try {
return new TextInputFormat();
} catch (Exception exception) {
log.error(exception.getMessage());
log.error(exception.fillInStackTrace());
throw new IOException();
}
}
@Override
public Tuple getNext() throws IOException {
log.info("Inside get Next");
Row row = rows.get(count);
if (row != null) {
mProtoTuple = new ArrayList<Object>();
for (int colNum = 0; colNum < colSize; colNum++) {
mProtoTuple.add(row.getObject(colNum));
}
} else {
return null;
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
log.info("Inside Prepare to Read");
session = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
if (user == null || pass == null) {
log.info("Creating Session with user name and password as: " + user + " : " + pass);
session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
log.info("Session Created");
} else {
session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
}
log.info("Executing Query " + query);
result = session.execute(query);
log.info("Query Executed :" + query);
rows = result.all();
count = 0;
colSize = result.getColumnDefinitions().asList().size();
}
@Override
public void setLocation(String location, Job job) throws IOException {
log.info("Inside Set Location");
try {
FileInputFormat.setInputPaths(job, location);
} catch (Exception exception) {
log.info("Some thing went wrong : " + exception.getMessage());
log.debug(exception);
}
}
}
org.apache.pig.impl.logicallayer.frontendException:错误1066:无法在org.apache.pig.pig.tools.grunt.gruntparser.openiterator(pigserver.java:892)在org.apache.pig.tools.gruntparser.processdump(gruntparser.java:774)在org.apache.pig.tools.pigscript.parse(pigscriptparser.java:372)在.parseStoponError(gruntParser.java:173)在org.apache.pig.tools.grunt.grunt.exec(grunt.java:84)在org.apache.pig.main.run(main.java:484)在org.apache.pig.main.run(main.java:484)在org.apache.pig.main.main(main.java:158)在sun.reflect.nativeMethodAccessorImpl.Invoke(原生方法)在606)在org.apache.hadoop.util.runjar.run(runjar.java:221)在org.apache.hadoop.util.runjar.main(runjar.java:136)原因:html" target="_blank">java.io.ioException:在org.apache.pig.pigserver.openiterator(pigserver.java:884)上,以异常状态终止的作业失败
维韦克!你有准备读吗?(我看到您做了一些日志记录,所以知道日志中实际有什么会很好)此外,提供完整的stacktrace也是非常好的,因为我看到您没有完整的底层异常。只是一些想法--我从来没有尝试过在编写LoadFunc时不实现自己的InputFormat和recordReader-textInputFormat检查文件的存在和大小(并根据文件大小创建许多InputSplits),所以如果虚拟文件为空,很有可能不会产生InputSplits,或者产生零长度的InputSplits。因为它的长度为零,所以可能会导致pig抛出异常。因此,最好的建议是实现自己的InputFormat(实际上很简单)。也只是一个快速的尝试-尝试
set pig.splitCombination false
可能不会有帮助,但很容易尝试。
我正在尝试读取Mac上pig shell上的csv文件。我所做的只是文件到变量中,然后变量。我是这样做的: 我使用的数据是从这里提供的github下载的 此文件在我的Mac上的本地安装的hdfs中可用。当我执行时,我得到一个错误: org.apache.pig.impl.logicallayer.FrontendException:错误1066:无法打开别名影片的迭代器 在org.apache.p
我将使用Apache Camel编写一个CRUD应用程序,非常像下面的示例:http://java.dzone.com/articles/rest-apache-camel 但我想使用JPA,而不仅仅是JDBC。 我看过Camel JPA组件,认为我可以使用它。但要从数据库中读取数据,它需要我定义一个消费者endpoint。 我想从JDBC示例中执行以下操作: 即调用JPA组件作为生产者。 这可能
问题内容: 我有一个MS-Access数据库,我正在使用JDBC(我认为是JDBC-ODBC桥)在Java中连接到该数据库。我的访问数据库有一些希伯来语值。 当我尝试使用String str = rs.getString(1)(rs是RowSet)读取这些值时,我得到的字符串只是一个问号字符串。 我还有希伯来语中的其他字符串,这些字符串是我在Java代码中使用字符串文字设置的,它们可以正常工作。所
我在数据库中有数据:Firebase中的数据库 我想读取所有子项(Belgia、Czechy、Polska…),并将其显示在文本字段中,但单击按钮后(我不会更改数据库中的数据)。按钮分配了以下功能: 在我写代码之前: 不幸的是,当我按下按钮时,什么也没有发生。 我很感激你的帮助问候,马辛
我试图从Firebase数据库中读取数据,我已经到处阅读和查找,但我已经走到了死胡同。 这就是我所做的一切。 依赖项: 实现'com.google.firebase: Firebase存储: 9.2.1' 实现'com。谷歌。firebase:firebase数据库:9.2。1' 实现'com。谷歌。firebase:firebase授权:9.2。1' 实现'com。谷歌。火基:火基核心:9.2。
问题内容: 我有2种情况,我要在codeigniter中提取同一表的全部数据和行总数,我想知道那是一种方法,可以从中获取行总数,整个数据和3个最新插入的记录通过一个代码在同一张桌子上 两种情况的控制器代码如下(尽管我分别使用不同的参数将其应用于每种情况) 1)从codeigniter中的表中获取全部数据 型号代码 查看代码 2)在Codeigniter中从表中获取行数 查看代码 问题答案: 您只能