我曾经parallel_bulk
在python中向Elasticsearch插入数据,但parallel_bulk
无法插入数据。我的代码:
class CreateIndex(object):
def _gen_data(self, index, doc_type, chunk_size):
sql = """select * from tem_search_engine_1 where rownum <= 10000"""
self.cursor.execute(sql)
col_name_list = [col[0].lower() for col in self.cursor.description]
col_name_len = len(col_name_list)
actions = []
start = time.time()
for row in self.cursor:
source = {}
tbl_id = ""
for i in range(col_name_len):
source.update({col_name_list[i]: str(row[i])})
if col_name_list[i] == "tbl_id":
tbl_id = row[i]
action = {
"_index": index,
"_type": doc_type,
"_id": tbl_id,
"_source": source
}
actions.append(action)
if len(actions) == chunk_size:
print("actions time:", time.time()-start)
yield actions
actions = []
print("for time:", time.time()-start)
yield actions
def bulk_data(self, index, doc_type, chunk_size=1000, is_parallel=True, threads_counts=4):
t1 = time.time()
gen_action = self._gen_data(index, doc_type, chunk_size)
if is_parallel is None or is_parallel == True:
for success, info in helpers.parallel_bulk(client=self.es, actions=gen_action, thread_count=threads_counts):
if not success:
print("Insert failed: ", info)
if __name__ == "__main__":
createindex = CreateIndex()
createindex.create_index(index="se", doc_type="se_doc")
createindex.bulk_data(index="se", doc_type="se_doc")
当我使用bulk_data
,但无法插入任何数据时,该如何处理呢?错误是:
Traceback (most recent call last):
File "F:/programs/ElasticSearch/CreateIndex.py", line 287, in <module>
createindex.bulk_data(index="se", doc_type="se_doc")
File "F:/programs/ElasticSearch/CreateIndex.py", line 179, in bulk_data
thread_count=threads_counts, chunk_size=chunk_size):
File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 306, in parallel_bulk
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
File "D:\anacond\lib\multiprocessing\pool.py", line 735, in next
raise value
File "D:\anacond\lib\multiprocessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "D:\anacond\lib\multiprocessing\pool.py", line 138, in _helper_reraises_exception
raise ex
File "D:\anacond\lib\multiprocessing\pool.py", line 290, in _guarded_task_generation
for i, x in enumerate(iterable):
File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 58, in _chunk_actions
for action, data in actions:
File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 37, in expand_action
op_type = data.pop('_op_type', 'index')
TypeError: pop() takes at most 1 argument (2 given)
使用parallel_bulk
method,您可以传递一个字典列表或一个生成器,生成一个字典。在这里解释。一个generator
在python用于未在RAM中加载变量,但如果你要通过你的elem
前一个清单-
在字典action
中的列表actions
,它没有更多的意义,因为建立一个列表,你应该在内存中加载所有里面的元素。在您的情况下,您要传递的生成器不会产生dict符action
--但会生成操作列表-
actions
。
因此,或者您的函数_gen_data返回一个列表,实际上是一个生成器的列表:
def _gen_data(self, index, doc_type, chunk_size):
sql = """select * from tem_search_engine_1 where rownum <= 10000"""
self.cursor.execute(sql)
col_name_list = [col[0].lower() for col in self.cursor.description]
col_name_len = len(col_name_list)
actions = []
start = time.time()
for row in self.cursor:
source = {}
tbl_id = ""
for i in range(col_name_len):
source.update({col_name_list[i]: str(row[i])})
if col_name_list[i] == "tbl_id":
tbl_id = row[i]
action = {
"_index": index,
"_type": doc_type,
"_id": tbl_id,
"_source": source
}
actions.append(action)
return actions
或者,您不创建actions
列表,而得出action
字典:
def _gen_data(self, index, doc_type, chunk_size):
sql = """select * from tem_search_engine_1 where rownum <= 10000"""
self.cursor.execute(sql)
col_name_list = [col[0].lower() for col in self.cursor.description]
col_name_len = len(col_name_list)
start = time.time()
for row in self.cursor:
source = {}
tbl_id = ""
for i in range(col_name_len):
source.update({col_name_list[i]: str(row[i])})
if col_name_list[i] == "tbl_id":
tbl_id = row[i]
yield {
"_index": index,
"_type": doc_type,
"_id": tbl_id,
"_source": source
}
事情是这样的 我不知道发生了什么,如果有人能帮忙,我会非常感激的。THX!
但是得到 jspException(jspservletwrapper.java:568)org.apache.jasper.servlet.jspservletwrapper.java:455)org.apache.jasper.server.jspserver.servicejjspfile(jspservlet.java:390)org.apache.jasper.server.jspser
我在将字符串数组插入mongo DB时遇到问题。在数组的特定大小之后,驱动程序挂起-不会抛出异常(超时或错误)。当我缩短长度时,一切正常。有人能解释这种奇怪的行为吗?很抱歉这个newbee问题,但我找不到一个现有的问题。 Mongo DB版本 非工作文档
我是laravel的新手,当我使用单击函数提交表单时出现了一些问题。ajax jquery controller不会将数据保存到数据库,每次响应时都会使用整个html文件。请帮帮我。 关于标题的一些信息 请求URL:http://akshay.laravel/patient1?fname=asdknkl 状态代码:200 OK 远程地址:[::1]:80 推荐人策略:降级时无推荐人 缓存控制:无缓
基本上我有两个简单的表…第一个叫做“user”,它是父表。PK是索引自动递增的。第二个表称为“useradvert”。“id”字段充当索引键,它不会自动递增。每当我尝试插入数据时,它都不会进入表(useradvert)。在我的PHP页面上没有任何错误。我打开了错误报告。我设法创建了一个没有错误的关系表。我已经试着解决这个问题好几天了,并在网上搜索答案,但仍然找不到和理解这个问题。问题是由于子表中的