我跟随本文将一些数据发送到AWS ES,并使用了jar elasticsearch-hadoop。这是我的脚本:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
if __name__ == "__main__":
conf = SparkConf().setAppName("WriteToES")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
es_conf = {"es.nodes" : "https://search-elasticsearchdomaine.region.es.amazonaws.com/",
"es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "sensor_counts/metrics"}
es_df_p = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv")
es_df_pf= es_df_p.groupBy("network_key")
es_df_pf.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
然后运行以下命令行:
spark-submit --jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py
其中write_to_es.py是上面的脚本。
这是我得到的错误:
17/05/05 17:51:52 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/05/05 17:51:52 INFO HadoopRDD: Input split: file:/home/user/spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633
17/05/05 17:51:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1143 bytes result sent to driver
17/05/05 17:51:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 11 ms on localhost (executor driver) (1/1)
17/05/05 17:51:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/05/05 17:51:52 INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) finished in 0,011 s
17/05/05 17:51:52 INFO DAGScheduler: Job 1 finished: load at NativeMethodAccessorImpl.java:0, took 0,018727 s
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.26:39609 in memory (size: 22.9 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
Traceback (most recent call last):
File "/home/user/spark-2.1.0-bin-hadoop2.7/write_to_es.py", line 11, in <module>
es_df_pf.saveAsNewAPIHadoopFile(
File "/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 964, in __getattr__
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'
17/05/05 17:51:53 INFO SparkContext: Invoking stop() from shutdown hook
17/05/05 17:51:53 INFO SparkUI: Stopped Spark web UI at http://192.168.1.26:4040
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/05/05 17:51:53 INFO MemoryStore: MemoryStore cleared
17/05/05 17:51:53 INFO BlockManager: BlockManager stopped
17/05/05 17:51:53 INFO BlockManagerMaster: BlockManagerMaster stopped
17/05/05 17:51:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/05/05 17:51:53 INFO SparkContext: Successfully stopped SparkContext
17/05/05 17:51:53 INFO ShutdownHookManager: Shutdown hook called
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0/pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507
如何解决这个问题:
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'
任何帮助或建议,我们将不胜感激。
我有同样的问题。
看完这篇文章,我找到了答案!!!
您必须PythonRDD
像这样转换为Type:
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> type(df.rdd)
<class 'pyspark.rdd.RDD'>
>>> df.rdd.saveAsNewAPIHadoopFile(...) # Got the same error message
>>> df.printSchema() # My schema
root
|-- id: string (nullable = true)
...
# Let's convert to PythonRDD
>>> python_rdd = df.map(lambda item: ('key', {
... 'id': item['id'],
...
... }))
>>> python_rdd
PythonRDD[42] at RDD at PythonRDD.scala:43
>>> python_rdd.saveAsNewAPIHadoopFile(...) # Now, success
我已经把麋鹿和Pyspark整合在一起了。 如果我手动完成,就可以写入数据 但是我想在弹性搜索中写过滤数据&托管数据。
问题内容: 我有一个看起来像这样: 两列都是String类型(StringType()),我想将其放入spark ml randomForest中。为此,我需要将要素列转换为包含浮点数的向量。有谁知道怎么做吗? 问题答案: 如果您使用的是 Spark 2.x ,我相信这就是您所需要的: 使用 Spark 1.6 并没有太大不同: 具有可以帮助您实现所要完成的功能的功能。
我正在尝试创建一个dataframe new\u df,并使用pyspark将数据帧加载到Kafka。然而,我很少有例外。不知道到底是什么问题。任何帮助都将不胜感激。 现在我正试图将数据框架与Kafka主题联系起来 异常(从错误中选取): 完全错误: Py4JJavaError:调用o1811时出错。保存:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段76.0中的任务8
我正在尝试使用以下代码将数据帧“df2”保存到文本文件中 代码:df2。写格式(“文本”)。模式(“覆盖”)。保存(“/tmp/hive/save\u text”) 错误: Py4JJavaError Traceback(最近一次调用) /databricks/spark/python/pyspark/sql/utils.py在deco(*a,**kw)62 try:--- /databricks
我想将一个数据帧保存到两个不同的csv文件中(拆分数据帧)-一个文件只包含标题,另一个文件包含其余行。 我想将这两个文件保存在同一个目录下,这样Spark处理所有逻辑将是最好的选择,如果可能的话,而不是使用pandas分割csv文件。 最有效的方法是什么? 谢谢你的帮助!
问题内容: 在我们的应用程序中,我们希望用户输入如下: 我想在单元测试中通过该部分,以便我可以恢复线程以执行其余代码。我如何从junit 写入内容? 问题答案: 你想要做的是使用的方法从。这将使您可以从junit 传递数据。