elasticsearch-py

孟鸿德
2023-12-01
import os
import time
from os import walk
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticTest:

    def __init__(self,index_name,index_type,ip_port):
        self.index_name = index_name
        self.index_type = index_type

        # 无用户名密码状态
        # self.es = Elasticsearch([ip:port])
        # 用户名密码状态
        self.es = Elasticsearch([ip_port], http_auth=('elastic', '123'))

    def create_index(self):
        # 创建index
        _index_mappings = {
            "mappings": {
                self.index_type: {
                    "properties": {
                        "name": {
                            "type": "keyword"
                        },
                        "age": {
                            "type": "long"
                        },
                        "sex": {
                            "type": "keyword"
                        },
                        "date": {
                            "type": "date"
                        },
                        'like':{
                            "type": "text"
                        }
                    }
                }
            }
        }
        if self.es.indices.exists(index=self.index_name) is not True:
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            print(res)

    def Index_Data(self):
        # 导入数据
        list = [
            {"name": "evol",
             "age": "18",
             "sex": "女",
             "date": "2000-01-01",
             "like": ["java","scala"]
             },
            {"name": "jack",
             "age": "18",
             "sex": "男",
             "date": "2000-02-02",
             "like": ["python", "js"]
             }
        ]
        for item in list:
            res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
            print(res)

    def bulk_Index_Data(self):
        # 批量导入数据
        list = [
            {"name": "alian",
             "age": "18",
             "sex": "女",
             "date": "2000-03-03",
             "like": "java"
             },
            {"name": "jakson",
             "age": "18",
             "sex": "男",
             "date": "2000-04-01",
             "like": "scala"
             },
            {"name": "loyi",
             "age": "18",
             "sex": "女",
             "date": "2000-05-01",
             "like": "python"
             },
            {"name": "simith",
             "age": "18",
             "sex": "男",
             "date": "2000-06-01",
             "like": "js"
             }
        ]
        ACTIONS = []
        i = 1
        for line in list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": i,  # _id 也可以默认生成,不赋值
                "_source": {
                    "name": line['name'],
                    "age": line['age'],
                    "sex": line['sex'],
                    "date": line['date'],
                    "like": line['like']}
            }
            i += 1
            ACTIONS.append(action)
            # 批量处理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)

    def Delete_Index_Data(self, id):
        # 通过id删除某条数据
        res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
        print(res)

    def Get_Data_Id(self, id):
        # 通过id查询某条数据
        res = self.es.get(index=self.index_name, doc_type=self.index_type, id=id)
        print(res['_source'])

    def Get_Data_By_Body(self):
        # 短语查询
        doc = {
            "query": {
                "bool": {
                  "must": [
                    {
                      "match_phrase": {
                        "like": "java"
                      }
                    },
                    {
                      "match_phrase": {
                        "like": "scala"
                      }
                    }
                  ]
                }
            }
        }
        _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)

        print(_searched['hits']['total'])
        for hit in _searched['hits']['hits']:
            # print(hit['_source'])
            print(hit['_source']['name'], hit['_source']['age'], hit['_source']['sex'], hit['_source']['date'], \
            hit['_source']['like'])

if __name__ == '__main__':
    es = ElasticTest("test", "test",'192.168.205.169:8200')
    # 创建index
    es.create_index()
    # 导入数据
    es.Index_Data()
    # 批量导入数据
    es.bulk_Index_Data()
    # 通过id删除某条数据
    es.Delete_Index_Data(1)
    # 通过id查询某条数据
    es.Get_Data_Id(2)
    # 短语查询
    es.Get_Data_By_Body()
 类似资料: