当前位置: 首页 > 知识库问答 >
问题:

如何在google cloud dataproc集群上同时使用jupyter、pyspark和cassandra

窦志新
2023-03-14

我试图让这三个工具在谷歌云平台上一起工作。所以我使用Dataproc创建了一个带有初始化脚本的Spark集群来安装cassandra和jupyter。

当我用ssh连接集群并启动“pyspark—packages datastax:spark cassandra connector:2.3.0-s_2.11”时,一切似乎都正常

编辑:事实上,spark shell可以,但pyspark不行。

我不知道如何使用pyspark内核和cassandra连接器启动jupyter。编辑:这个问题似乎更多地与pyspark有关,而不是与jupyter有关

我试图修改内核。json

    {
     "argv": [
        "bash",
        "-c",
        "PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='kernel -f {connection_file}' pyspark"],
     "env": {
        "PYSPARK_SUBMIT_ARGS": "--master local[*] pyspark-shell --packages datastax:spark-cassandra-connector:2.3.0-s_2.11"
     },
     "display_name": "PySpark",
     "language": "python"
    }

但这似乎不起作用。在jupyter时,我找不到任何关于cassandra的信息,有些例外情况如下:

JAVAlang.ClassNotFoundException:未能找到数据源:pyspark。sql。卡桑德拉。

(我尝试了其他PYSPARK_SUBMIT_参数,并在PYSPARK_驱动程序_PYTHON_选项中添加了--package,但没有任何效果)

编辑:当我启动pyspark时,我有一些警告。我看不到任何与我的问题有关的信息,但可能是我错了,因此以下是pyspark的启动消息:

    myuserhome@spark-cluster-m:~$ pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0
    Python 2.7.9 (default, Jun 29 2016, 13:08:31) 
    [GCC 4.9.2] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    Ivy Default Cache set to: /home/myuserhome/.ivy2/cache
    The jars for the packages stored in: /home/myuserhome/.ivy2/jars
    :: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    com.datastax.spark#spark-cassandra-connector_2.11 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
            confs: [default]
            found com.datastax.spark#spark-cassandra-connector_2.11;2.3.0 in central
            found com.twitter#jsr166e;1.1.0 in central
            found commons-beanutils#commons-beanutils;1.9.3 in central
            found commons-collections#commons-collections;3.2.2 in central
            found joda-time#joda-time;2.3 in central
            found org.joda#joda-convert;1.2 in central
            found io.netty#netty-all;4.0.33.Final in central
            found org.scala-lang#scala-reflect;2.11.8 in central
    :: resolution report :: resolve 2615ms :: artifacts dl 86ms
            :: modules in use:
            com.datastax.spark#spark-cassandra-connector_2.11;2.3.0 from central in [default]
            com.twitter#jsr166e;1.1.0 from central in [default]
            commons-beanutils#commons-beanutils;1.9.3 from central in [default]
            commons-collections#commons-collections;3.2.2 from central in [default]
            io.netty#netty-all;4.0.33.Final from central in [default]
            joda-time#joda-time;2.3 from central in [default]
            org.joda#joda-convert;1.2 from central in [default]
            org.scala-lang#scala-reflect;2.11.8 from central in [default]
            ---------------------------------------------------------------------
            |                  |            modules            ||   artifacts   |
            |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
            ---------------------------------------------------------------------
            |      default     |   8   |   0   |   0   |   0   ||   8   |   0   |
            ---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent
            confs: [default]
            0 artifacts copied, 8 already retrieved (0kB/76ms)
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/06/17 11:08:22 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
    java.lang.InterruptedException
            at java.lang.Object.wait(Native Method)
            at java.lang.Thread.join(Thread.java:1252)
            at java.lang.Thread.join(Thread.java:1326)
            at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
            at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
            at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
    18/06/17 11:08:23 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
    java.lang.InterruptedException
            at java.lang.Object.wait(Native Method)
            at java.lang.Thread.join(Thread.java:1252)
            at java.lang.Thread.join(Thread.java:1326)
            at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
            at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
            at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/com.datastax.spark_spark-cassandra-connector_2.11-2.3.0.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/com.twitter_jsr166e-1.1.0.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/commons-beanutils_commons-beanutils-1.9.3.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/joda-time_joda-time-2.3.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/org.joda_joda-convert-1.2.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/io.netty_netty-all-4.0.33.Final.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar added multiple times to distributed cache.
    18/06/17 11:08:23 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:/home/myuserhome/.ivy2/jars/commons-collections_commons-collections-3.2.2.jar added multiple times to distributed cache.
    18/06/17 11:08:24 WARN org.apache.hadoop.hdfs.DataStreamer: Caught exception
    java.lang.InterruptedException
            at java.lang.Object.wait(Native Method)
            at java.lang.Thread.join(Thread.java:1252)
            at java.lang.Thread.join(Thread.java:1326)
            at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:973)
            at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:624)
            at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801)
    ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.2.1
          /_/

    Using Python version 2.7.9 (default, Jun 29 2016 13:08:31)
    SparkSession available as 'spark'.
    >>> import org.apache.spark.sql.cassandra
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named org.apache.spark.sql.cassandra
    >>> import pyspark.sql.cassandra
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named cassandra

编辑关于尝试导入pyspark中的java包,这只是我发现的最简单的命令,引发了我面临的异常。这里还有一个:

    dfout.write.format("pyspark.sql.cassandra").mode("overwrite").option("confirm.truncate","true").option("spark.cassandra.connection.host","10.142.0.4").option("spark.cassandra.connection.port","9042").option("keyspace","uasb03").option("table","activite").save()

    > "An error occurred while calling o113.save.\n: java.lang.ClassNotFoundException: Failed to find data source: pyspark.sql.cassandra.

我想我也尝试过org.apache.spark.sql.cassandra,但我必须重试:你的回答澄清了我盲目尝试的许多事情(--master=本地[*]也是一个尝试)。

关于集群:除了--properties之外,它是按照您建议的方式(对于jupyter)创建的。除了我不能使用卡桑德拉连接器外,jupyter工作正常。

编辑:根据Karthik Palaniappan的建议

现在当我通过SSH使用pyspark时,它起作用了。但是对于Jupyter,我仍然有一个错误:

    df=spark.read.format("csv").option("header","true").option("inferSchema","true").option("nullValue","NA").option("timestampFormat","ddMMMyyyy:HH:mm:ss").option("quote", "\"").option("delimiter", ";").option("mode","failfast").load("gs://tidy-centaur-b1/data/myfile.csv")

    import pyspark.sql.functions as F

    dfi = df.withColumn("id", F.monotonically_increasing_id()).withColumnRenamed("NUMANO", "numano")

    dfi.createOrReplaceTempView("pathologie")

    dfi.write.format("org.apache.spark.sql.cassandra").mode("overwrite").option("confirm.truncate","true").option("spark.cassandra.connection.host","10.142.0.3").option("spark.cassandra.connection.port","9042").option("keyspace","mykeyspace").option("table","mytable").save()

    Py4JJavaError: An error occurred while calling o115.save.
    : java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html

我按照您建议的方式重新创建了群集:

    gcloud dataproc clusters create spark-cluster \
         --async \
         --project=tidy-centaur-205516 \
         --region=us-east1 \
         --zone=us-east1-b \
         --bucket=tidy-centaur-b1 \
         --image-version=1.2 \
         --num-masters=1 \
         --master-boot-disk-size=10GB \
         --master-machine-type=n1-standard-2 \
         --num-workers=2 \
         --worker-boot-disk-size=10GB \
         --worker-machine-type=n1-standard-1 \
         --metadata 'CONDA_PACKAGES="numpy pandas scipy matplotlib",PIP_PACKAGES=pandas-gbq' \
         --properties spark:spark.packages=com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 \
         --initialization-actions=gs://tidy-centaur-b1/init-cluster.sh,gs://dataproc-initialization-actions/jupyter2/jupyter2.sh

init-cluster.sh安装了卡珊德拉

我执行了jupyter笔记本——生成配置修改了pyspark内核。json

    {
     "argv": [
        "bash",
        "-c",
        "PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='kernel -f {connection_file}' pyspark"],
     "env": {
        "PYSPARK_SUBMIT_ARGS": "pyspark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0"
     },
     "display_name": "PySpark",
     "language": "python"
    }

共有1个答案

袁成化
2023-03-14

根据Spark-cassandra连接器的文档,应该在PySpark中使用数据源API。例如spark.read.format("org.apache.spark.sql.cassandra")...。在引擎盖下,这将使用您添加的Java /Scala包。我不知道你为什么要在pyspark导入Java包。

请使用Jupyter(Python 3 Conda)或Jupyter2(Python 2 Pip)初始化操作来正确安装Jupyter PySpark。重要的是,您不希望使用--master=本地[*],因为这只会利用master节点。

此外,--pack标志与火花属性spark.packages相同。当使用创建群集时,您可以设置火花属性--属性火花:spark.jars.packages=

所以我想你想要这样的东西:

gcloud dataproc clusters create <cluster-name> \
    --initialization-actions gs://dataproc-initialization-actions/jupyter/jupyter.sh
    --properties spark:spark.jars.packages=datastax:spark-cassandra-connector:2.3.0-s_2.11

然后,按照连接器pyspark文档中的说明进行操作。例如。

 spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="kv", keyspace="test") \
    .load().show()

 类似资料:
  • 我是新来的。我正试图从我的主实例读取一个文件,但我得到了这个错误。经过研究,我发现要么需要将数据加载到hdfs,要么需要跨集群复制。我找不到执行这两个命令的命令。 ----------------------------------------------------------------------------------------------------------------------

  • 我对Spark和使用python编写使用PySpark的作业是新手。我想在一个yarn集群上运行我的脚本,并通过发送将日志记录级别设置为使用标记来删除详细的日志记录。我有一个本地csv文件,脚本使用,我需要包括这以及。如何使用标记来包含这两个文件? 我正在使用以下命令: 但是我得到以下错误:`

  • 我将express与socket.io一起使用,将express-session与Express-socket.io-session一起使用,这有助于将会话连接到我的套接字实例。 Deploy函数运行诸如会话、路由等内容。 POST http://localhost:8080/socket.io/?eio=3&transport=polling&t=lz9ey8p 404(未找到)

  • 我开始学习如何使用TensorFlow进行机器学习。发现docker在我的机器上部署TensorFlow非常方便。然而,我能找到的例子对我的目标设定不起作用。那是 在ubuntu16.04操作系统下,使用nvidia-docker同时托管jupyter和拉伸板服务(可以是两个容器或一个容器有两个服务)。从jupyter创建的文件应该对主机操作系统可见。 Ubuntu 16.04 多克尔 nvidi

  • 我正在尝试为我的一个rest应用程序编写集成测试用例,该应用程序在内部使用mongodb来持久化数据 但我正在犯错 看起来这两个是互斥的,那么如何做集成测试。

  • 我有一个要求,其中我的一些quartz作业应该以集群方式运行(三个节点中只有一个应该运行该作业),而一些作业应该以非集群方式运行(三个节点中的所有三个都应该运行该作业)。 2个quartz.properties一个用于群集实例,一个用于非群集实例。 群集的两个实例都将在应用程序启动时启动。 因此,在非群集排定程序下配置的作业将以排定程序名称NON_CLST_SCHE保存在jobs表中,在同一表中以