当前位置: 首页 > 知识库问答 >
问题:

如何使用spark-cassandra连接器在pyspark中写入数据帧中使用sparkSession

郦兴德
2023-03-14

我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。

我使用以下方法创建了 sparkSession:

spark_session = None

def set_up_spark(sparkconf,config):
    """
    sets up spark configuration and create a session
    :return: None
    """
    try:
        logger.info("spark conf set up Started")
        global spark_session
        spark_conf = SparkConf()
        for key, val in sparkconf.items():
            spark_conf.set(key, val)
        spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
        logger.info("spark conf set up Completed")
    except Exception as e:
        raise e

我使用此 SparkSession 将数据作为数据帧读取为:

table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .load()

我可以使用上述会话读取数据。spark_session附在上述查询中。

现在我需要创建另一个会话,因为写表的凭据不同。我将查询写成:

table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .mode("append") \
            .save()

在cassandra中,我找不到如何为上面的写入操作附加一个新的spac的会话。

如何在带有spark-cassandra连接器的pyspark中为写入操作附加新的SparkSession?

共有1个答案

裴兴言
2023-03-14

您可以简单地将该信息作为选项传递给特定的readwriteoperation,这包括以下内容:,

请注意,您需要将这些选项放入字典中,并传递这个字典,而不是像留档中描述的那样直接传递。

read_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP1", 
  "spark.cassandra.auth.username": "username1", 
  "spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(**read_options) \
            .load()

write_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP2", 
  "spark.cassandra.auth.username": "username2", 
  "spark.cassandra.auth.password":"password1"}
table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(**write_options) \
            .mode("append") \
            .save()
 类似资料:
  • 我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改

  • 谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?

  • 我正在尝试使用pysparkn和spack-csv使用以下代码将火花数据帧写入s3 但是,我得到的错误是“输出目录已经存在”,我确信输出目录在作业开始之前不存在,我尝试使用不同的输出目录名称运行,但写入仍然失败。 如果我在作业失败后查看s3桶,我发现很少有零件文件是由火花写入的,但当它尝试写入更多时它失败了,脚本在本地运行良好,我在aws集群上使用10个火花执行器。有人知道这段代码有什么问题吗?

  • 问题-无法使用Spark Cassandra连接器1.5.0连接Cassandra 3.0 根据DataStax Spark Cassandra Connector文档,它说Spark Connector 1.5可以从Spark 1.5.0/1.6.0用于Cassandra 3.0。 你能告诉我我是不是漏掉了哪一步? 尝试的方法 在“pom.xml”中添加了单独的番石榴依赖项 提前谢了。

  • 这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2

  • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这