我已经把麋鹿和Pyspark整合在一起了。
rdd.saveAsTextFile("/tmp/ELKdata")
logData = sc.textFile('/tmp/ELKdata/*')
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors.count()
errors.first()
errors.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf= {"es.resource" : "logstash-2016.01.12/errors})
org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes) 16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job 16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31 16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled 16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62) 16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665 Traceback (most recent call last): File "", line 6, in File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile keyConverter, valueConverter, jconf) File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62) 16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62) org.apache.spark.TaskKilledException at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool : An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
如果我手动完成,就可以写入数据
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox",
"ident": "NetworkManager",
"pid": "69",
"message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]",
"@timestamp": "2016-01-12T10:59:48+05:30"
}))
但是我想在弹性搜索中写过滤数据&托管数据。
我也有类似的问题,下面是我如何解决它的。首先,我使用了一个dataframe,而使用了一个RDD。
一次在数据文件中
from pyspark.sql import SQLContext
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
问题内容: 我跟随本文将一些数据发送到AWS ES,并使用了jar elasticsearch-hadoop。这是我的脚本: 然后运行以下命令行: 其中write_to_es.py是上面的脚本。 这是我得到的错误: 如何解决这个问题: 任何帮助或建议,我们将不胜感激。 问题答案: 我有同样的问题。 看完这篇文章,我找到了答案!!! 您必须像这样转换为Type:
问题内容: 我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例代码段以推送到ES, 我收到一个错误消息, java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql。 请在http://spark.apache.org/third-party- projects.
我正在尝试使用以下代码将数据帧“df2”保存到文本文件中 代码:df2。写格式(“文本”)。模式(“覆盖”)。保存(“/tmp/hive/save\u text”) 错误: Py4JJavaError Traceback(最近一次调用) /databricks/spark/python/pyspark/sql/utils.py在deco(*a,**kw)62 try:--- /databricks
我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。 我使用以下方法创建了 sparkSession: 我使用此 SparkSession 将数据作为数据帧读取为: 我可以使用上述会话读取数据。spark_session附
问题内容: 初学者ES问题在这里 将Spark数据框推送到Elastic Search的工作流程或步骤是什么? 通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。 但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑 请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个
这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2