初学者ES问题在这里
将Spark数据框推送到Elastic Search的工作流程或步骤是什么?
通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。
但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑
请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个字符串类型,2个长类型和1个整数列表),具有约350万行。
设法找到答案,所以我会分享。SparkDF(来自pyspark.sql)目前不支持该newAPIHadoopFile()
方法;但是,这df.rdd.saveAsNewAPIHadoopFile()
也给了我错误。诀窍是通过以下功能将df转换为字符串
def transform(doc):
import json
import hashlib
_json = json.dumps(doc)
keys = doc.keys()
for key in keys:
if doc[key] == 'null' or doc[key] == 'None':
del doc[key]
if not doc.has_key('id'):
id = hashlib.sha224(_json).hexdigest()
doc['id'] = id
else:
id = doc['id']
_json = json.dumps(doc)
return (id, _json)
所以我的JSON工作流程是:
1: df = spark.read.json('XXX.json')
2: rdd_mapped = df.rdd.map(lambda y: y.asDict())
3: final_rdd = rdd_mapped.map(transform)
4:
final_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id",
"es.input.json": "true", "es.net.http.auth.user":"elastic",
"es.write.operation":"index", "es.nodes.wan.only":"false",
"es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...",
"es.port":"9200" })
有关ES参数的更多信息,请参见此处(滚动到“配置”)
问题内容: 我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例代码段以推送到ES, 我收到一个错误消息, java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql。 请在http://spark.apache.org/third-party- projects.
我是新来的反应本地人。我需要,如何推setState数组到新的数据?
我想使用heroku pg:push命令将本地postgresql数据库推送到heroku。命令如下所示: 。 我的应用程序的名称是。我尝试了。输出是: 我很惊讶我没有在数据库中输入任何内容。但我仍然运行heroku pg:reset DATABASE来重置我的数据库。之后,我再次尝试了heroku pg:推送mysitedb数据库——app secure-gorge-4090,但输出仍然相同。
我是docker和所有这些日志记录的新手,所以可能我犯了一个stuipd错误,所以提前感谢您的帮助。我让麋鹿通过Dockerfile行运行docker容器(6.2.2): 在一个单独的容器中,我通过folling Dockerfile行安装并运行Filebeat: 我的Filebeat配置是: 正如你所看到的,我试图在elasticsearch中创建一个自定义索引,但现在我只是想先让默认索引正常工
我正在使用Python语言。我有csv文件,我需要转换成json并发送到kafka,然后发送到ElasticSearch。 我能够将Csv转换为Json并发送给Kafka消费者。如何从Kafka Consumer向ElasticSearch获取数据
这是当前的代码:从'@angull/core'导入{Component,OnInit,OnDestroy};从'AngularFire2/FireStore'导入{AngularFirestore,AngularFirestoreCollection};从'AngularFire2/Database'导入{AngularFireDatabase};从'AngularFire2/Auth'导入{An