当前位置: 首页 > 面试题库 >

如何将Spark数据框推送到elasticsearch(Pyspark)

朱炜
2023-03-14
问题内容

初学者ES问题在这里

将Spark数据框推送到Elastic Search的工作流程或步骤是什么?

通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。

但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑

请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个字符串类型,2个长类型和1个整数列表),具有约350万行。


问题答案:

设法找到答案,所以我会分享。SparkDF(来自pyspark.sql)目前不支持该newAPIHadoopFile()方法;但是,这df.rdd.saveAsNewAPIHadoopFile()也给了我错误。诀窍是通过以下功能将df转换为字符串

def transform(doc):
    import json
    import hashlib

    _json = json.dumps(doc)
    keys = doc.keys()
    for key in keys:
        if doc[key] == 'null' or doc[key] == 'None':
            del doc[key]
    if not doc.has_key('id'):
        id = hashlib.sha224(_json).hexdigest()
        doc['id'] = id
    else:
        id = doc['id']
    _json = json.dumps(doc)
    return (id, _json)

所以我的JSON工作流程是:

1: df = spark.read.json('XXX.json')

2: rdd_mapped = df.rdd.map(lambda y: y.asDict())

3: final_rdd = rdd_mapped.map(transform)

4:

final_rdd.saveAsNewAPIHadoopFile(
     path='-', 
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
     keyClass="org.apache.hadoop.io.NullWritable",  
     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
     conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id", 
         "es.input.json": "true", "es.net.http.auth.user":"elastic",
         "es.write.operation":"index", "es.nodes.wan.only":"false",
         "es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...",
         "es.port":"9200" })

有关ES参数的更多信息,请参见此处(滚动到“配置”)



 类似资料:
  • 问题内容: 我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例代码段以推送到ES, 我收到一个错误消息, java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql。 请在http://spark.apache.org/third-party- projects.

  • 我是新来的反应本地人。我需要,如何推setState数组到新的数据?

  • 我想使用heroku pg:push命令将本地postgresql数据库推送到heroku。命令如下所示: 。 我的应用程序的名称是。我尝试了。输出是: 我很惊讶我没有在数据库中输入任何内容。但我仍然运行heroku pg:reset DATABASE来重置我的数据库。之后,我再次尝试了heroku pg:推送mysitedb数据库——app secure-gorge-4090,但输出仍然相同。

  • 我是docker和所有这些日志记录的新手,所以可能我犯了一个stuipd错误,所以提前感谢您的帮助。我让麋鹿通过Dockerfile行运行docker容器(6.2.2): 在一个单独的容器中,我通过folling Dockerfile行安装并运行Filebeat: 我的Filebeat配置是: 正如你所看到的,我试图在elasticsearch中创建一个自定义索引,但现在我只是想先让默认索引正常工

  • 我正在使用Python语言。我有csv文件,我需要转换成json并发送到kafka,然后发送到ElasticSearch。 我能够将Csv转换为Json并发送给Kafka消费者。如何从Kafka Consumer向ElasticSearch获取数据

  • 这是当前的代码:从'@angull/core'导入{Component,OnInit,OnDestroy};从'AngularFire2/FireStore'导入{AngularFirestore,AngularFirestoreCollection};从'AngularFire2/Database'导入{AngularFireDatabase};从'AngularFire2/Auth'导入{An