当前位置: 首页 > 知识库问答 >
问题:

将RDD转换为DataFrame Spark Streaming时的ClassCastException

东门楚
2023-03-14

嗨,伙计们,我有下一个问题。我正在使用Java的Apache Spark Streaming v1.6.0来获取来自IBMMQ的一些消息。我为MQ制作了自定义接收器,但我遇到的问题是我需要将RDD从JavaDStream转换为DataFrame。为此,我使用foreachRDD迭代JavaDStream,并定义了DataFrame的模式,但当我运行作业时,第一条消息会引发下一个异常:

Java语言lang.ClassCastException:组织。阿帕奇。火花rdd。BlockRDDPartition无法转换为组织。阿帕奇。火花rdd。组织上的ParallelCollectionPartition。阿帕奇。火花rdd。并行集合RDD。计算(ParallelCollectionRDD.scala:102)。阿帕奇。火花rdd。RDD。位于组织的computeOrReadCheckpoint(RDD.scala:306)。阿帕奇。火花rdd。RDD。org上的迭代器(RDD.scala:270)。阿帕奇。火花调度程序。结果任务。在组织上运行任务(ResultTask.scala:66)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:89)。阿帕奇。火花执行人。执行者$TaskRunner。在java上运行(Executor.scala:213)。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1149)。util。同时发生的ThreadPoolExecutor$工作者。在java上运行(ThreadPoolExecutor.java:624)。lang.Thread。run(Thread.java:748)19/03/28 12:53:26警告TaskSetManager:在0.0阶段(TID 0,localhost)丢失任务0.0:java。lang.ClassCastException:组织。阿帕奇。火花rdd。BlockRDDPartition无法转换为组织。阿帕奇。火花rdd。组织上的ParallelCollectionPartition。阿帕奇。火花rdd。并行集合RDD。计算(ParallelCollectionRDD.scala:102)。阿帕奇。火花rdd。RDD。位于组织的computeOrReadCheckpoint(RDD.scala:306)。阿帕奇。火花rdd。RDD。org上的迭代器(RDD.scala:270)。阿帕奇。火花调度程序。结果任务。在组织上运行任务(ResultTask.scala:66)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:89)。阿帕奇。火花执行人。执行者$TaskRunner。在java上运行(Executor.scala:213)。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1149)。util。同时发生的ThreadPoolExecutor$工作者。在java上运行(ThreadPoolExecutor.java:624)。lang.Thread。运行(Thread.java:748)

然后代码执行得很好。即使我在MQ中没有任何消息,也只是我运行作业时的第一条消息。

这是我的CustomMQReceiver

public CustomMQReceiver() {

        super(StorageLevel.MEMORY_ONLY_2());

    }

    @Override
    public void onStart() {

        new Thread() {
            @Override
            public void run() {
                try {
                    initConnection();
                    receive();
                } catch (JMSException ex) {
                    ex.printStackTrace();
                }
            }
        }.start();

    }

    @Override
    public void onStop() {

    }

    private void receive() {

        System.out.print("Started receiving messages from MQ");

        try {

            Message receivedMessage = null;

            while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null) {

                String userInput = convertStreamToString(receivedMessage);
                System.out.println("Received data :'" + userInput + "'");
                store(userInput);
            }

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

        } catch (Exception e) {
            e.printStackTrace();
            restart("Trying to connect again");
        } catch (Throwable t) {

            restart("Error receiving data", t);
        }

    }

    public void initConnection() throws JMSException {

        MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
        conFactory.setHostName(HOST);
        conFactory.setPort(PORT);
        conFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        conFactory.setQueueManager(QMGR);
        conFactory.setChannel(CHANNEL);
        conFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
        conFactory.setStringProperty(WMQConstants.USERID, APP_USER);
        conFactory.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);

        qCon = (MQQueueConnection) conFactory.createConnection();
        MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, 1);
        MQQueue queue = (MQQueue) qSession.createQueue(QUEUE_NAME);
        consumer = (MQMessageConsumer) qSession.createConsumer(queue);
        qCon.start();

    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_ONLY_2();
    }

    private static String convertStreamToString(final Message jmsMsg) throws Exception {

        String stringMessage = "";
        JMSTextMessage msg = (JMSTextMessage) jmsMsg;
        stringMessage = msg.getText();

        return stringMessage;
    }

这是我的spark代码

SparkConf sparkConf = new SparkConf()
                    .setAppName("MQStreaming")
                    .set("spark.driver.allowMultipleContexts", "true")
                    .setMaster("local[*]");

            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            final SQLContext sqlContext = new SQLContext(jsc);
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(propertiesConf.getProperty("duration"))));

            JavaDStream<String> customReceiverStream = ssc.receiverStream(new CustomMQReceiver());

            customReceiverStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {

                @Override
                public void call(JavaRDD<String> rdd) throws Exception {

                    JavaRDD<Row> rddRow = rdd.map(new Function<String, Row>() {

                        @Override
                        public Row call(String v1) throws Exception {

                            return RowFactory.create(v1);

                        }

                    });

                    try {

                        StructType schema = new StructType(new StructField[]{
                            new StructField("trama", DataTypes.StringType, true, Metadata.empty())
                        });

                        DataFrame frame = sqlContext.createDataFrame(rddRow, schema);

                        if (frame.count() > 0) {
                            //Here is where the first messages throw the exception
                            frame.show();
                            frame.write().mode(SaveMode.Append).json("file:///C:/tmp/");

                        }

                    } catch (Exception ex) {

                        System.out.println(" INFO " + ex.getMessage());

                    }

                }

            });

            ssc.start();
            ssc.awaitTermination();

我无法更改spark的版本,因为此作业将在使用spark 1.6的旧cloudera集群中运行。我不知道我是做错了什么,还是只是个虫子。帮助

共有1个答案

何禄
2023-03-14

我解决了自己的问题,这个异常是由我如何创建SQLContext引发的,正确的方法是使用JavaStreamingContext创建SQLContext

//JavaStreamingContext jsc = ...
SQLContext sqlContext = new SQLContext(jsc.sparkContext());
 类似资料:
  • 我对Spark和Scala相对较新。 我从以下数据帧开始(由密集的双倍向量组成的单列): 直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]: 有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。

  • 我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext

  • 我尝试使用以下代码获取数据帧的分区数量: 按照我的理解,dataframe通过元数据给rdd增加了一个结构层。那么,为什么在转换成rdd时要花这么多时间呢?

  • 我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:

  • 我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数

  • 有人能分享一下如何将转换为吗?