import os
import time
from os import walk
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ElasticTest:
def __init__(self,index_name,index_type,ip_port):
self.index_name = index_name
self.index_type = index_type
# 无用户名密码状态
# self.es = Elasticsearch([ip:port])
# 用户名密码状态
self.es = Elasticsearch([ip_port], http_auth=('elastic', '123'))
def create_index(self):
# 创建index
_index_mappings = {
"mappings": {
self.index_type: {
"properties": {
"name": {
"type": "keyword"
},
"age": {
"type": "long"
},
"sex": {
"type": "keyword"
},
"date": {
"type": "date"
},
'like':{
"type": "text"
}
}
}
}
}
if self.es.indices.exists(index=self.index_name) is not True:
res = self.es.indices.create(index=self.index_name, body=_index_mappings)
print(res)
def Index_Data(self):
# 导入数据
list = [
{"name": "evol",
"age": "18",
"sex": "女",
"date": "2000-01-01",
"like": ["java","scala"]
},
{"name": "jack",
"age": "18",
"sex": "男",
"date": "2000-02-02",
"like": ["python", "js"]
}
]
for item in list:
res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
print(res)
def bulk_Index_Data(self):
# 批量导入数据
list = [
{"name": "alian",
"age": "18",
"sex": "女",
"date": "2000-03-03",
"like": "java"
},
{"name": "jakson",
"age": "18",
"sex": "男",
"date": "2000-04-01",
"like": "scala"
},
{"name": "loyi",
"age": "18",
"sex": "女",
"date": "2000-05-01",
"like": "python"
},
{"name": "simith",
"age": "18",
"sex": "男",
"date": "2000-06-01",
"like": "js"
}
]
ACTIONS = []
i = 1
for line in list:
action = {
"_index": self.index_name,
"_type": self.index_type,
"_id": i, # _id 也可以默认生成,不赋值
"_source": {
"name": line['name'],
"age": line['age'],
"sex": line['sex'],
"date": line['date'],
"like": line['like']}
}
i += 1
ACTIONS.append(action)
# 批量处理
success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
def Delete_Index_Data(self, id):
# 通过id删除某条数据
res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
print(res)
def Get_Data_Id(self, id):
# 通过id查询某条数据
res = self.es.get(index=self.index_name, doc_type=self.index_type, id=id)
print(res['_source'])
def Get_Data_By_Body(self):
# 短语查询
doc = {
"query": {
"bool": {
"must": [
{
"match_phrase": {
"like": "java"
}
},
{
"match_phrase": {
"like": "scala"
}
}
]
}
}
}
_searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
print(_searched['hits']['total'])
for hit in _searched['hits']['hits']:
# print(hit['_source'])
print(hit['_source']['name'], hit['_source']['age'], hit['_source']['sex'], hit['_source']['date'], \
hit['_source']['like'])
if __name__ == '__main__':
es = ElasticTest("test", "test",'192.168.205.169:8200')
# 创建index
es.create_index()
# 导入数据
es.Index_Data()
# 批量导入数据
es.bulk_Index_Data()
# 通过id删除某条数据
es.Delete_Index_Data(1)
# 通过id查询某条数据
es.Get_Data_Id(2)
# 短语查询
es.Get_Data_By_Body()