当前位置: 首页 > 知识库问答 >
问题:

为什么我的spark工作停留在Kafka流媒体上

慕河
2023-03-14

spark作业提交到minicube创建的kubernetes集群中的spark集群后的输出:

----------------- RUNNING ----------------------
[Stage 0:>                                                          (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:>                                                          (0 + 0) / 2]

来自spark web ui的信息:

我的文件中的foreachRDD。scala:49详细信息

组织。阿帕奇。火花流动。数据流。数据流。foreachRDD(DStream.scala:625)myfile。运行(myfile.scala:49)Myjob$。main(Myjob.scala:100)Myjob。main(Myjob.scala)sun。反映NativeMethodAccessorImpl。invoke0(本机方法)sun。反映NativeMethodAccessorImpl。调用(NativeMethodAccessorImpl.java:62)sun。反映DelegatingMethodAccessorImpl。调用(DelegatingMethodAccessorImpl.java:43)java。朗。反思。方法调用(Method.java:498)org。阿帕奇。火花部署SparkSubmit$。org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)org。阿帕奇。火花部署SparkSubmit$。doRunMain$1(SparkSubmit.scala:187)org。阿帕奇。火花部署SparkSubmit$。提交(SparkSubmit.scala:212)org。阿帕奇。火花部署SparkSubmit$。main(SparkSubmit.scala:126)org。阿帕奇。火花部署SparkSubmit。main(SparkSubmit.scala)

我的代码:

  println("----------------- RUNNING ----------------------");
    eventsStream.foreachRDD { rdd =>
        println("xxxxxxxxxxxxxxxxxxxxx")
        //println(rdd.count());
    if( !rdd.isEmpty )
    {
      println("yyyyyyyyyyyyyyyyyyyyyyy")
        val df = sqlContext.read.json(rdd);
        df.registerTempTable("data");

        val rules = rulesSource.rules();
        var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
        rules.foreach { rule =>
        ...
        }

        sqlContext.dropTempTable("data")
    }
    else
    {
        println("-------");
        println("NO DATA");
        println("-------");
    }
}

有什么想法吗?谢谢

更新

我的spark作业在独立spark的docker容器中运行良好。但若提交到kubernetes集群中的spark集群,它将被卡在Kafka流中。不知道为什么?

spark master的yaml文件来自https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    name: spark-master
  name: spark-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: spark-master
    spec:
      containers:
      - name : spark-master
        image: spark-2.1.0-bin-hadoop2.6 
        imagePullPolicy: "IfNotPresent"
        name: spark-master
        ports:
        - containerPort: 7077
          protocol: TCP
        command:
         - "/bin/bash"
         - "-c"
         - "--"
        args :
- './start-master.sh ; sleep infinity'

共有1个答案

诸葛砚
2023-03-14

日志将有助于诊断问题。

本质上,你不能在RDD操作中创建另一个RDD。即rdd1.map{rdd2.count()}无效

查看导入隐式sqlContext后RDD如何转换为数据帧。

        import sqlContext.implicits._
        eventsStream.foreachRDD { rdd =>

            println("yyyyyyyyyyyyyyyyyyyyyyy")

            val df = rdd.toDF(); 
            df.registerTempTable("data");
            .... //Your logic here.
            sqlContext.dropTempTable("data")
        }
 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • 我想在Apache Flink中做流媒体工作来做Kafka- 这应该是流式处理。

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我有一个用于结构化流媒体的Kafka和Spark应用程序。特别是我的KafkaProducer具有以下配置: 然后我创建了一个ProducerRecord,如下所示: 其中,json。toString()表示一个JSON格式的字符串,这是我想在Spark中处理的值。现在,我主要做的是将Spark与Kafka主题联系起来,正如官方Spark结构化流媒体指南中所报道的那样: 然后 我有以下输出和异常:

  • 我的火花流应用程序从Kafka获取数据并对其进行处理。 如果应用程序失败,大量数据存储在Kafka中,并且在Spark Streaming应用程序的下一次启动时,它会崩溃,因为一次消耗了太多数据。由于我的应用程序不关心过去的数据,因此只消耗当前(最新)数据完全没关系。 我找到了“auto.reset.offest”选项,它在Spark中的行为几乎没有什么不同。如果配置了zookeeper,它会删除