django-haystack ,搜索内容更改的时候,实时更新

韶弘壮
2023-12-01

我使用过的es场景有:知识库、商品搜索、问答系统。
对于搜索内容的更新,一般情况,是使用命令行工具,然后做一个定时任务。比如每15分钟,跑一遍更新。
那就存在延时更新问题。

想要实现实时更新,方法有两种:

1,直接调用elasticsearch的接口
es本身就是 restful 形式。所以可以通过实时调用put,post,delete接口来实现更新。
但缺点就是要自己处理数据格式。无法方便的时候 haystack。
2,使用 haystack 原生代码
看源码:查看命令工具 update_index 命令,可以看到其中执行更新的代码,主要为如下这一段:

try:
            # FIXME: Get the right backend.
            backend.update(index, current_qs, commit=commit)
            if verbosity >= 2 and retries:
                print('Completed indexing {} - {}, tried {}/{} times'.format(start + 1,
                                                                             end,
                                                                             retries + 1,
                                                                             max_retries))
            break
        except Exception as exc:
            # Catch all exceptions which do not normally trigger a system exit, excluding SystemExit and
            # KeyboardInterrupt. This avoids needing to import the backend-specific exception subclasses
            # from pysolr, elasticsearch, whoosh, requests, etc.
            retries += 1

            error_context = {'start': start + 1,
                             'end': end,
                             'retries': retries,
                             'max_retries': max_retries,
                             'pid': os.getpid(),
                             'exc': exc}

            error_msg = 'Failed indexing %(start)s - %(end)s (retry %(retries)s/%(max_retries)s): %(exc)s'
            if not is_parent_process:
                error_msg += ' (pid %(pid)s): %(exc)s'

            if retries >= max_retries:
                LOG.error(error_msg, error_context, exc_info=True)
                raise
            elif verbosity >= 2:
                LOG.warning(error_msg, error_context, exc_info=True)

            # If going to try again, sleep a bit before
            time.sleep(2 ** retries)

再往下看,还有一部分

            if self.remove:
                if self.start_date or self.end_date or total <= 0:
                    # They're using a reduced set, which may not incorporate
                    # all pks. Rebuild the list with everything.
                    qs = index.index_queryset().values_list('pk', flat=True)
                    database_pks = set(smart_bytes(pk) for pk in qs)
                else:
                    database_pks = set(smart_bytes(pk) for pk in qs.values_list('pk', flat=True))

                # Since records may still be in the search index but not the local database
                # we'll use that to create batches for processing.
                # See https://github.com/django-haystack/django-haystack/issues/1186
                index_total = SearchQuerySet(using=backend.connection_alias).models(model).count()

                # Retrieve PKs from the index. Note that this cannot be a numeric range query because although
                # pks are normally numeric they can be non-numeric UUIDs or other custom values. To reduce
                # load on the search engine, we only retrieve the pk field, which will be checked against the
                # full list obtained from the database, and the id field, which will be used to delete the
                # record should it be found to be stale.
                index_pks = SearchQuerySet(using=backend.connection_alias).models(model)
                index_pks = index_pks.values_list('pk', 'id')

                # We'll collect all of the record IDs which are no longer present in the database and delete
                # them after walking the entire index. This uses more memory than the incremental approach but
                # avoids needing the pagination logic below to account for both commit modes:
                stale_records = set()

                for start in range(0, index_total, batch_size):
                    upper_bound = start + batch_size

                    # If the database pk is no longer present, queue the index key for removal:
                    for pk, rec_id in index_pks[start:upper_bound]:
                        if smart_bytes(pk) not in database_pks:
                            stale_records.add(rec_id)

                if stale_records:
                    if self.verbosity >= 1:
                        self.stdout.write("  removing %d stale records." % len(stale_records))

                    for rec_id in stale_records:
                        # Since the PK was not in the database list, we'll delete the record from the search
                        # index:
                        if self.verbosity >= 2:
                            self.stdout.write("  removing %s." % rec_id)

                        backend.remove(rec_id, commit=self.commit)

很明显,这是更新和删除分开了。
其中核心是这句

backend.update(index, current_qs, commit=commit)
...
backend.remove(rec_id, commit=self.commit)

其中,index就是我们创建索引用到的的 search_index
current_qs 就是其中需要更新的每一个 索引对象(注意,是个Index对象)
参考我的更新、删除接口(对于我们的商品,只要每次更新、删除的时候调用):
文件 search_indexes.py

from haystack import indexes


class GoodsIndex(indexes.SearchIndex, indexes.Indexable):
    text = indexes.CharField(document=True, use_template=True)
    goods_id = indexes.IntegerField(model_attr='goods_id')
    shop_id = indexes.IntegerField(model_attr='shop_id')
    spu_name = indexes.CharField(model_attr='spu_name')
    create_time = indexes.CharField(model_attr='create_time')
    spu_id = indexes.CharField(model_attr='spu_id')
    price = indexes.DecimalField(model_attr='price')
    shipping_id = indexes.IntegerField(model_attr='shipping_id')
    category_id = indexes.IntegerField(model_attr='category_id')
    goods_image = indexes.CharField()
    goods_comment = indexes.IntegerField(model_attr='goods_comment')
    search_field = indexes.CharField(model_attr='search_field')
    param_search = indexes.CharField(model_attr='attr_param_search')


    def prepare_goods_image(self, obj):
        goods = GoodsGoods.objects.filter(goods_id=obj.goods_id).first()
        if not goods:
            return goods
        return goods.goods_thumb_image

    def get_model(self):
        return GoodsSpu

    def index_queryset(self, using=None):
        # 将不符合条件的店铺商品过滤掉
        shop = SellerBaseInfo.objects.filter(shop_close=1).using('shop')
        shop_list = []
        for r in shop:
            shop_list.append(r.shop_id)
        return self.get_model().objects.all().filter(goods__is_deleted=0, 
                                                     shop_id__in=shop_list, is_delete=0).order_by('-create_time')

view.py文件

from search_indexes import GoodsIndex
from haystack import connections as haystack_connections
from haystack.query import SearchQuerySet

spu_ids = [1,2,3]
backends = haystack_connections.connections_info.keys()
for using in backends:
	backend = haystack_connections[using].get_backend()
	
def update():
    index = GoodsIndex()
    qs = index.build_queryset()
    for spu_id in spu_ids:
    	# 筛选出需要更新的索引文档
        current_qs = qs.filter(spu_id=spu_id).all()
        # 更新索引信息
        current_qs = list(current_qs)
        # 解释一下:更新index索引中的current_qs文档信息
        max_pk = backend.update(index, current_qs, commit=True)

def remove():
    # 筛选出需要删除的数据
    index_pks = SearchQuerySet().filter(spu_id__in=spu_ids)
    # 获取到es中的索引
    index_pks = index_pks.values_list('pk','id')
    for pk,rec_id in index_pks:
        backend.remove(rec_id, commit=True)

 类似资料: