当前位置: 首页 > 知识库问答 >
问题:

如何从PySpark在Elasticsearch中写入数据?

夏俊人
2023-03-14

我已经把麋鹿和Pyspark整合在一起了。

rdd.saveAsTextFile("/tmp/ELKdata")
logData = sc.textFile('/tmp/ELKdata/*')
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors.count()
errors.first()
errors.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf= {"es.resource" : "logstash-2016.01.12/errors}) 

org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665
Traceback (most recent call last):
  File "", line 6, in 
  File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
    keyConverter, valueConverter, jconf)
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
org.apache.spark.TaskKilledException
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


如果我手动完成,就可以写入数据

errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
 errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox",
    "ident": "NetworkManager",
    "pid": "69",
    "message": " sucess  <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]",
    "@timestamp": "2016-01-12T10:59:48+05:30"
  }))

但是我想在弹性搜索中写过滤数据&托管数据。

共有1个答案

韩景胜
2023-03-14

我也有类似的问题,下面是我如何解决它的。首先,我使用了一个dataframe,而使用了一个RDD。

一次在数据文件中

from pyspark.sql import SQLContext
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
 类似资料:
  • 问题内容: 我跟随本文将一些数据发送到AWS ES,并使用了jar elasticsearch-hadoop。这是我的脚本: 然后运行以下命令行: 其中write_to_es.py是上面的脚本。 这是我得到的错误: 如何解决这个问题: 任何帮助或建议,我们将不胜感激。 问题答案: 我有同样的问题。 看完这篇文章,我找到了答案!!! 您必须像这样转换为Type:

  • 问题内容: 我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例代码段以推送到ES, 我收到一个错误消息, java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql。 请在http://spark.apache.org/third-party- projects.

  • 我正在尝试使用以下代码将数据帧“df2”保存到文本文件中 代码:df2。写格式(“文本”)。模式(“覆盖”)。保存(“/tmp/hive/save\u text”) 错误: Py4JJavaError Traceback(最近一次调用) /databricks/spark/python/pyspark/sql/utils.py在deco(*a,**kw)62 try:--- /databricks

  • 我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。 我使用以下方法创建了 sparkSession: 我使用此 SparkSession 将数据作为数据帧读取为: 我可以使用上述会话读取数据。spark_session附

  • 问题内容: 初学者ES问题在这里 将Spark数据框推送到Elastic Search的工作流程或步骤是什么? 通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。 但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑 请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个

  • 这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2