用xapian跟mmseg实现中文搜索

赫连法
2023-12-01

http://outofmemory.cn/code-snippet/133/use-xapian-gen-mmseg-achieve-zhongwensousuo


xapian是一个开源的信息检索项目,类似于lucence。

安装:用apt-get可以在ubuntu里安装如下模块:

apt -xapian-index - maintenance tools for a Xapian index of Debian packages
libxapian15 - Search engine library
python-xapian - Xapian search engine interface for Python

如果是用django,可以安装:python-django-djapian - Search API for Django using Xapian

mmseg是一个国人写的中文分词模块.

用mmseg加上xapian就可以实现中文搜索的功能了。

好了,在我的项目中,原始数据是保存在数据库中的,所以需要从数据库中读出数据,再来建立索引,所以建立索引的代码如下:

Python代码

class Index(object):  

    def __init__(self, DBPATH, host='localhost', user='dbuser', passwd='dbpasswd', dbname='pjname'):  
        self.SEARCH_DB = xapian.WritableDatabase(DBPATH, xapian.DB_CREATE_OR_OPEN)  
        self.conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=dbname, charset='utf8')  
    def _add_hanzi(self, doc, data):  
        if not data:  
            return   
        for word, value in seg_txt_2_dict(data).iteritems():  
            doc.add_term(word, value)  

    def modify_data(self, data_dict):  
        for k, v in data_dict.items():  
            if not v:  
                data_dict[k] = ''  
            if isinstance(v, int):  
                data_dict[k] = str(v)  
            if isinstance(v, datetime.date):  
                data_dict[k] = u'%04d%02d%02d' %(v.year, v.month, v.day)  
            if isinstance(v, Decimal):  
                data_dict[k] = str(v)  
        return data_dict  

    def _update_index(self, data_dict):  
        data_dict = self.modify_data(data_dict)  
        doc = xapian.Document()  
        for k, v in data_dict.items():  
            if k in ('name', 'author', 'isbn'):  
                self._add_hanzi(doc, v)  
        key = 'I%s'%data_dict['id']  
        doc.add_term(key)  
        data = simplejson.dumps(data_dict, encoding='utf8')  
        doc.set_data(data)  
        self.SEARCH_DB.replace_document(key, doc)  

    def update_index(self):  
        for row in self.get_database_value():  
            self._update_index(row)  

    def get_database_value(self):  
        cursor = self.conn.cursor()  
        count = 0  
        keys = ['id', 'name', 'isbn', 'price', 'pulisher', 'publish_date', 'img_url', 'description', 'author']  
        while True:  
            cursor.execute('select bb.id as id, bb.name as name, isbn, price, publisher, publish_date, img_url, description, group_concat(ba.name) as author  from book_book as bb left join book_book_author as bba on bb.id=bba.book_id left join book_author as ba on bba.author_id=ba.id group by bb.id limit %s,%s', (count*1000, 1000))  
            data = cursor.fetchall()  
            count = count + 1  
            if not data or count > 1000:  
                break  
            for value in data:  
                value_d = dict(zip(keys, value))  
                yield value_d  

索引建立后就可以搜索了,下面是搜索的代码(还实现了结果分页的功能):

Python代码

class Search(object):  
    def __init__(self, DBPATH):  
        self.SEARCH_DB = xapian.Database(DBPATH)  
        self.SEARCH_ENQUIRE = xapian.Enquire(self.SEARCH_DB)  

    def _get_enquire_mset(self, start_offset, end_offset):  
        try:  
            return self.SEARCH_ENQUIRE.get_mset(start_offset, end_offset)  
        except xapian.DatabaseModifiedError:  
            self.SEARCH_DB.reopen()  
            return self.SEARCH_ENQUIRE.get_mset(start_offset, end_offset)  

    def _get_document_data(self, document):  
        try:  
            return document.get_data()  
        except xapian.DatabaseModifiedError:  
            self.SEARCH_DB.reopen()  
            return document.get_data()  

    def _get_hit_count(self):  
        return self._get_enquire_mset(0, self.SEARCH_DB.get_doccount()).size()  

    def search(self, keywords, start_offset=0, end_offset=None):  
        query_list = []  
        if isinstance(keywords, unicode):  
            keywords = keywords.encode('utf8')  
        for word, value in seg_txt_2_dict(keywords).iteritems():  
            query = xapian.Query(word, value)  
            query_list.append(query)  
        if len(query_list) != 1:  
            query = xapian.Query(xapian.Query.OP_AND, query_list)  
        else:  
            query = query_list[0]  

        self.SEARCH_ENQUIRE.set_query(query)  
        count = self.SEARCH_DB.get_doccount()  
        if not end_offset:  
            end_offset = count - start_offset  

        matches = self._get_enquire_mset(start_offset, end_offset)  

        results = []  
        for match in matches:  
            data = self._get_document_data(match.document)  
            data = simplejson.loads(data, encoding='utf8')  
            results.append(data)  

        return {'count': self._get_hit_count(), 'object_list':results}  

    def search_by_page(self, keywords, pagenum=1, num_per_page=20):  
        if pagenum < 1:  
            pagenum = 1  
        start_offset = (pagenum - 1) * num_per_page  
        end_offset = num_per_page  
        data = self.search(keywords, start_offset, end_offset)  
        data['has_previous'] = pagenum >1 and True or False  
        data['previous_page_number'] = pagenum > 1 and pagenum - 1 or 1  
        data['number'] = pagenum  
        data['has_next'] = pagenum*num_per_page < data['count'] and True or False  
        data['next_page_number'] = pagenum + 1  
        data['paginator'] = {'num_pages': (data['count']+num_per_page-1) / num_per_page}  
        return data  
 ```

在django使用搜索功能:

Python代码

```python
def search_book(request):  

    search_words = request.REQUEST.get('search_words')  
    try:  
        page = int(request.GET.get('page', '1'))  
    except ValueError:  
        page = 1  
    if request.method == 'POST':  
        page = 1  
    books = SEARCH.search_by_page(search_words, page)  

    t=get_template('book/search_result.html')  
    c=RequestContext(request,locals())  
    return HttpResponse(t.render(c))  

 类似资料: