当前位置: 首页 > 知识库问答 >
问题:

如何使用Pyspark将结构化流数据写入Cassandra表?

魏誉
2023-03-14

这是我运行strm.py文件的终端命令

$spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2.4.0strm.py

错误:

无法从JAR org.apache.spark加载主类:spark-cassandra-connector_2.11:2.4.0,URI为org.apache.spark。请通过--class指定一个类。在org.apache.spark.deploy.sparkSubmitArguments.error(sparkSubmitArguments.657)在org.apache.spark.deploy.sparkSubmitArguments.LoadEnvironmentArguments(sparkSubmitArguments.scala:224)在org.apache.spark.deploy.sparkSubmitArguments(sparkSubmitArguments.scala:116)在org.apache.spark.deploy.sparkSubmitArguments(

有没有人能帮我解决这个问题,为什么它不能加载。

共有1个答案

扈俊健
2023-03-14

您有两个问题:

>

  • 您提交的应用程序不正确-org.apache.spark:spark-sql-kafka-0-102.11:2.4.0org.apache.spark:spark-cassandra-connector2.11:2.4.0之间没有逗号,因此spark-submit将cassandra连接器视为一个jar,而不是使用您的python文件。

    Spark Cassandra Connector的当前版本不支持Spark结构化流数据的直接写入-该功能仅在DSE Analytics中可用。但是您可以通过使用foreachbatch来解决这一问题,如下所示(未经测试,工作的Scala代码可在此处获得):

    def foreach_batch_function(df, epoch_id):
        df.format("org.apache.spark.sql.cassandra").option("keyspace","test")\
           .option("table", "my_tables").mode('append').save()
    
    query.writeStream.foreachBatch(foreach_batch_function).start()  
    

  •  类似资料:
    • 我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。 我使用以下方法创建了 sparkSession: 我使用此 SparkSession 将数据作为数据帧读取为: 我可以使用上述会话读取数据。spark_session附

    • 问题内容: 我跟随本文将一些数据发送到AWS ES,并使用了jar elasticsearch-hadoop。这是我的脚本: 然后运行以下命令行: 其中write_to_es.py是上面的脚本。 这是我得到的错误: 如何解决这个问题: 任何帮助或建议,我们将不胜感激。 问题答案: 我有同样的问题。 看完这篇文章,我找到了答案!!! 您必须像这样转换为Type:

    • 我有一个带有MongoDB的Nodejs应用程序。现在我想使用Elasticsearch将数据从mongo复制到Elasticsearch。我正在使用npm软件包“ElasticSearch”。例如,对于收藏“帖子”,我有这样的: 正如您所看到的,我的数据是非结构化的,并且Elasticsearch在我添加这些项时显示了错误。我想要一个把戏关闭弹性搜索限制,允许我添加这些数据。我不能改变我的数据它

    • 问题内容: 背景 我想通过python客户端API将换行格式的JSON插入/添加到表中。 例如: 问题是,一行中的所有字段都是可选的,并且没有针对数据的固定定义模式。 询问 我已经读过我们可以使用支持自动模式检测的联合表。 但是,我正在寻找一种功能,该功能将自动从数据中检测模式,相应地创建表,甚至在数据中出现任何额外的列/键而不是创建新表时甚至调整表模式。 使用python客户端API是否有可能。

    • 我已经把麋鹿和Pyspark整合在一起了。 如果我手动完成,就可以写入数据 但是我想在弹性搜索中写过滤数据&托管数据。

    • 我正在尝试创建一个dataframe new\u df,并使用pyspark将数据帧加载到Kafka。然而,我很少有例外。不知道到底是什么问题。任何帮助都将不胜感激。 现在我正试图将数据框架与Kafka主题联系起来 异常(从错误中选取): 完全错误: Py4JJavaError:调用o1811时出错。保存:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段76.0中的任务8