我正在尝试使用Elasticsearchparallel_bulk导入大量数据。这是我的索引结构:
{
"_index" : "myindex",
"_type" : domain,
"_id" : md5(email),
"_score" : 1.0,
"_source" : {
"purchase_date" : purchase_date,
"amount" : amount,
}
}
这是我的python代码:
def insert(input_file):
paramL = []
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_date": purchase_date,
"amount": amount _date
}
ogg = {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
paramL.append(ogg)
if len(paramL) > 500000:
for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
if not success:
print "Insert failed: ", info
# empty paramL if size > 5.000.000
del paramL[:]
该文件包含42.644.394行,我认为每次列表“paramL”大约是5.000.000个元素时都要插入数据。因此,当我运行该脚本时,它插入了大约436.226个值,直到它崩溃并出现以下错误:
回溯(最近一次调用): 文件 “test-2-0.py”,第 133 行,在 main() 文件 “test-2-0.py”,第 131 行,在 main insert(args.file) 文件 “test-2-0.py”,第 82 行,在插入中表示成功,在 helpers.parallel_bulk 中的信息(client=es,actions=paramL,thread_count=4):文件 “/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py”,第 306 行,在 parallel_bulk _chunk_actions 中(actions, chunk_size, max_chunk_bytes, client.transport.serializer) 文件 “/usr/lib/python2.7/multiprocessing/pool.py”,第 668 行,在下一个提升值 elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout 由 - ReadTimeoutError(HTTPConnectionPool(host=u'127.0.0.1', port=9200) 引起:读取超时。(读取超时 = 10))
我还试图增加超时传递它在Elasticsearch构造函数
es = Elasticsearch(['127.0.0.1'], request_timeout=30)
但结果是一样的。
诚恳地说,我从来没有批量导入这么多要标记的文档。我不知道为什么会出现这个错误。在您的情况下,我建议不要创建列表-paramL,而是使用生成器函数来管理您的数据,正如弹性开发人员在弹性论坛中描述的大批量摄取的最佳实践:https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3。大致如下:
def insert(input_file):
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_attack": purchase_date,
"amount _relevation": amount _date
}
yield {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
for success, info in helpers.parallel_bulk(client=es, actions=insert(input_file), thread_count=4):
if not success:
print "Insert failed: ", info
您可以在java虚拟机中增加专用于elastic的空间编辑此文件< code >/etc/elastic search/JVM . options 要分配2 GB的RAM,您应该更改——如果您的机器有4 GB,您应该为系统保留近1 GB,因此您最多可以分配3 gb:
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms2g
-Xmx2g
然后,您必须重新启动服务
sudo service elasticsearch restart
再试一次。祝你好运
我有一种情况,其中两个WebServer是用nginx作为loadbalancer设置的,它们本身是后端。发行版是Debian Wheezy。两台服务器上的配置相同(四核32GB RAM) TCP协议
问题内容: 在Windows上连接Jenkins从属代理失败,连接超时。 环境:Windows Server 2003 R2 Java6 掌握:Linux从属:Windows 我尝试将其同时运行为jnlp和java -jar cmd,但始终失败。 问题答案: 您没有提供太多信息,但是从错误消息中,您似乎暗示您的构建计算机正在Amazon EC2上运行。 在这种情况下,您的EC2实例安全组可能不允许
问题内容: 我在代码中使用了RMI: 这些是4个.java文件。 接下来,我编译所有这些文件。然后创建一个using 。之后,我使用来在服务器端启动rmi注册表。然后,我开始使用服务器,最后使用客户端。 但是什么也没发生 客户端抛出的异常是 原因是什么,我该如何解决? 在客户端计算机上,这些是以下.class文件,在服务器端 问题答案: 错误消息说明了一切:您的连接超时。这意味着您的请求在某个(默
代码片段如下所示: 如果有人有决议,请帮忙?
**dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)