当前位置: 首页 > 知识库问答 >
问题:

在pig中使用UDF从数据库中读取数据

慕铭
2023-03-14

我需要从数据库读取数据,并使用PIG分析数据。我用java编写了一个UDF,引用了下面的链接

register /tmp/UDFJars/CassandraUDF_1-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
A = Load '/user/sampleFile.txt' using udf.DBLoader('10.xx.xxx.4','username','password','select * from customer limit 10') as (f1 : chararray);
DUMP A;


package udf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.data.ConnectionCassandra;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    Session session;
    private ArrayList mProtoTuple = null;
    private String jdbcURL;
    private String user;
    private String pass;
    private int count = 0;
    private String query;
    ResultSet result;
    List<Row> rows;
    int colSize;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String jdbcURL, String user, String pass, String query) {

        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        log.info("Inside InputFormat");
        // TODO Auto-generated method stub
        try {
            return new TextInputFormat();
        } catch (Exception exception) {
            log.error(exception.getMessage());
            log.error(exception.fillInStackTrace());
            throw new IOException();
        }
    }

    @Override
    public Tuple getNext() throws IOException {
        log.info("Inside get Next");
        Row row = rows.get(count);
        if (row != null) {
            mProtoTuple = new ArrayList<Object>();
            for (int colNum = 0; colNum < colSize; colNum++) {
                mProtoTuple.add(row.getObject(colNum));
            }
        } else {
            return null;
        }
        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
        log.info("Inside Prepare to Read");
        session = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        if (user == null || pass == null) {
            log.info("Creating Session with user name and password as: " + user + " : " + pass);
            session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
            log.info("Session Created");
        } else {
            session = ConnectionCassandra.connectToCassandra1(jdbcURL, user, pass);
        }
        log.info("Executing Query " + query);
        result = session.execute(query);
        log.info("Query Executed :" + query);
        rows = result.all();
        count = 0;
        colSize = result.getColumnDefinitions().asList().size();
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        log.info("Inside Set Location");
        try {
            FileInputFormat.setInputPaths(job, location);
        } catch (Exception exception) {
            log.info("Some thing went wrong : " + exception.getMessage());
            log.debug(exception);
        }

    }
}

org.apache.pig.impl.logicallayer.frontendException:错误1066:无法在org.apache.pig.pig.tools.grunt.gruntparser.openiterator(pigserver.java:892)在org.apache.pig.tools.gruntparser.processdump(gruntparser.java:774)在org.apache.pig.tools.pigscript.parse(pigscriptparser.java:372)在.parseStoponError(gruntParser.java:173)在org.apache.pig.tools.grunt.grunt.exec(grunt.java:84)在org.apache.pig.main.run(main.java:484)在org.apache.pig.main.run(main.java:484)在org.apache.pig.main.main(main.java:158)在sun.reflect.nativeMethodAccessorImpl.Invoke(原生方法)在606)在org.apache.hadoop.util.runjar.run(runjar.java:221)在org.apache.hadoop.util.runjar.main(runjar.java:136)原因:html" target="_blank">java.io.ioException:在org.apache.pig.pigserver.openiterator(pigserver.java:884)上,以异常状态终止的作业失败

共有1个答案

单于轶
2023-03-14

维韦克!你有准备读吗?(我看到您做了一些日志记录,所以知道日志中实际有什么会很好)此外,提供完整的stacktrace也是非常好的,因为我看到您没有完整的底层异常。只是一些想法--我从来没有尝试过在编写LoadFunc时不实现自己的InputFormat和recordReader-textInputFormat检查文件的存在和大小(并根据文件大小创建许多InputSplits),所以如果虚拟文件为空,很有可能不会产生InputSplits,或者产生零长度的InputSplits。因为它的长度为零,所以可能会导致pig抛出异常。因此,最好的建议是实现自己的InputFormat(实际上很简单)。也只是一个快速的尝试-尝试

set pig.splitCombination false

可能不会有帮助,但很容易尝试。

 类似资料:
  • 我正在尝试读取Mac上pig shell上的csv文件。我所做的只是文件到变量中,然后变量。我是这样做的: 我使用的数据是从这里提供的github下载的 此文件在我的Mac上的本地安装的hdfs中可用。当我执行时,我得到一个错误: org.apache.pig.impl.logicallayer.FrontendException:错误1066:无法打开别名影片的迭代器 在org.apache.p

  • 我将使用Apache Camel编写一个CRUD应用程序,非常像下面的示例:http://java.dzone.com/articles/rest-apache-camel 但我想使用JPA,而不仅仅是JDBC。 我看过Camel JPA组件,认为我可以使用它。但要从数据库中读取数据,它需要我定义一个消费者endpoint。 我想从JDBC示例中执行以下操作: 即调用JPA组件作为生产者。 这可能

  • 问题内容: 我有一个MS-Access数据库,我正在使用JDBC(我认为是JDBC-ODBC桥)在Java中连接到该数据库。我的访问数据库有一些希伯来语值。 当我尝试使用String str = rs.getString(1)(rs是RowSet)读取这些值时,我得到的字符串只是一个问号字符串。 我还有希伯来语中的其他字符串,这些字符串是我在Java代码中使用字符串文字设置的,它们可以正常工作。所

  • 我在数据库中有数据:Firebase中的数据库 我想读取所有子项(Belgia、Czechy、Polska…),并将其显示在文本字段中,但单击按钮后(我不会更改数据库中的数据)。按钮分配了以下功能: 在我写代码之前: 不幸的是,当我按下按钮时,什么也没有发生。 我很感激你的帮助问候,马辛

  • 我试图从Firebase数据库中读取数据,我已经到处阅读和查找,但我已经走到了死胡同。 这就是我所做的一切。 依赖项: 实现'com.google.firebase: Firebase存储: 9.2.1' 实现'com。谷歌。firebase:firebase数据库:9.2。1' 实现'com。谷歌。firebase:firebase授权:9.2。1' 实现'com。谷歌。火基:火基核心:9.2。

  • 问题内容: 我有2种情况,我要在codeigniter中提取同一表的全部数据和行总数,我想知道那是一种方法,可以从中获取行总数,整个数据和3个最新插入的记录通过一个代码在同一张桌子上 两种情况的控制器代码如下(尽管我分别使用不同的参数将其应用于每种情况) 1)从codeigniter中的表中获取全部数据 型号代码 查看代码 2)在Codeigniter中从表中获取行数 查看代码 问题答案: 您只能