我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。
我使用以下方法创建了 sparkSession:
spark_session = None
def set_up_spark(sparkconf,config):
"""
sets up spark configuration and create a session
:return: None
"""
try:
logger.info("spark conf set up Started")
global spark_session
spark_conf = SparkConf()
for key, val in sparkconf.items():
spark_conf.set(key, val)
spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
logger.info("spark conf set up Completed")
except Exception as e:
raise e
我使用此 SparkSession 将数据作为数据帧读取为:
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.load()
我可以使用上述会话读取数据。spark_session附在上述查询中。
现在我需要创建另一个会话,因为写表的凭据不同。我将查询写成:
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.mode("append") \
.save()
在cassandra中,我找不到如何为上面的写入操作附加一个新的spac的会话。
如何在带有spark-cassandra连接器的pyspark中为写入操作附加新的SparkSession?
您可以简单地将该信息作为选项传递给特定的read
或write
operation,这包括以下内容:,
请注意,您需要将这些选项放入字典中,并传递这个字典,而不是像留档中描述的那样直接传递。
read_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP1",
"spark.cassandra.auth.username": "username1",
"spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(**read_options) \
.load()
write_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP2",
"spark.cassandra.auth.username": "username2",
"spark.cassandra.auth.password":"password1"}
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(**write_options) \
.mode("append") \
.save()
我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改
谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?
我正在尝试使用pysparkn和spack-csv使用以下代码将火花数据帧写入s3 但是,我得到的错误是“输出目录已经存在”,我确信输出目录在作业开始之前不存在,我尝试使用不同的输出目录名称运行,但写入仍然失败。 如果我在作业失败后查看s3桶,我发现很少有零件文件是由火花写入的,但当它尝试写入更多时它失败了,脚本在本地运行良好,我在aws集群上使用10个火花执行器。有人知道这段代码有什么问题吗?
问题-无法使用Spark Cassandra连接器1.5.0连接Cassandra 3.0 根据DataStax Spark Cassandra Connector文档,它说Spark Connector 1.5可以从Spark 1.5.0/1.6.0用于Cassandra 3.0。 你能告诉我我是不是漏掉了哪一步? 尝试的方法 在“pom.xml”中添加了单独的番石榴依赖项 提前谢了。
这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2
我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这