我使用过的es场景有:知识库、商品搜索、问答系统。
对于搜索内容的更新,一般情况,是使用命令行工具,然后做一个定时任务。比如每15分钟,跑一遍更新。
那就存在延时更新问题。
1,直接调用elasticsearch的接口
es本身就是 restful 形式。所以可以通过实时调用put,post,delete接口来实现更新。
但缺点就是要自己处理数据格式。无法方便的时候 haystack。
2,使用 haystack 原生代码
看源码:查看命令工具 update_index 命令,可以看到其中执行更新的代码,主要为如下这一段:
try:
# FIXME: Get the right backend.
backend.update(index, current_qs, commit=commit)
if verbosity >= 2 and retries:
print('Completed indexing {} - {}, tried {}/{} times'.format(start + 1,
end,
retries + 1,
max_retries))
break
except Exception as exc:
# Catch all exceptions which do not normally trigger a system exit, excluding SystemExit and
# KeyboardInterrupt. This avoids needing to import the backend-specific exception subclasses
# from pysolr, elasticsearch, whoosh, requests, etc.
retries += 1
error_context = {'start': start + 1,
'end': end,
'retries': retries,
'max_retries': max_retries,
'pid': os.getpid(),
'exc': exc}
error_msg = 'Failed indexing %(start)s - %(end)s (retry %(retries)s/%(max_retries)s): %(exc)s'
if not is_parent_process:
error_msg += ' (pid %(pid)s): %(exc)s'
if retries >= max_retries:
LOG.error(error_msg, error_context, exc_info=True)
raise
elif verbosity >= 2:
LOG.warning(error_msg, error_context, exc_info=True)
# If going to try again, sleep a bit before
time.sleep(2 ** retries)
再往下看,还有一部分
if self.remove:
if self.start_date or self.end_date or total <= 0:
# They're using a reduced set, which may not incorporate
# all pks. Rebuild the list with everything.
qs = index.index_queryset().values_list('pk', flat=True)
database_pks = set(smart_bytes(pk) for pk in qs)
else:
database_pks = set(smart_bytes(pk) for pk in qs.values_list('pk', flat=True))
# Since records may still be in the search index but not the local database
# we'll use that to create batches for processing.
# See https://github.com/django-haystack/django-haystack/issues/1186
index_total = SearchQuerySet(using=backend.connection_alias).models(model).count()
# Retrieve PKs from the index. Note that this cannot be a numeric range query because although
# pks are normally numeric they can be non-numeric UUIDs or other custom values. To reduce
# load on the search engine, we only retrieve the pk field, which will be checked against the
# full list obtained from the database, and the id field, which will be used to delete the
# record should it be found to be stale.
index_pks = SearchQuerySet(using=backend.connection_alias).models(model)
index_pks = index_pks.values_list('pk', 'id')
# We'll collect all of the record IDs which are no longer present in the database and delete
# them after walking the entire index. This uses more memory than the incremental approach but
# avoids needing the pagination logic below to account for both commit modes:
stale_records = set()
for start in range(0, index_total, batch_size):
upper_bound = start + batch_size
# If the database pk is no longer present, queue the index key for removal:
for pk, rec_id in index_pks[start:upper_bound]:
if smart_bytes(pk) not in database_pks:
stale_records.add(rec_id)
if stale_records:
if self.verbosity >= 1:
self.stdout.write(" removing %d stale records." % len(stale_records))
for rec_id in stale_records:
# Since the PK was not in the database list, we'll delete the record from the search
# index:
if self.verbosity >= 2:
self.stdout.write(" removing %s." % rec_id)
backend.remove(rec_id, commit=self.commit)
很明显,这是更新和删除分开了。
其中核心是这句
backend.update(index, current_qs, commit=commit)
...
backend.remove(rec_id, commit=self.commit)
其中,index就是我们创建索引用到的的 search_index
current_qs 就是其中需要更新的每一个 索引对象(注意,是个Index对象)
参考我的更新、删除接口(对于我们的商品,只要每次更新、删除的时候调用):
文件 search_indexes.py
from haystack import indexes
class GoodsIndex(indexes.SearchIndex, indexes.Indexable):
text = indexes.CharField(document=True, use_template=True)
goods_id = indexes.IntegerField(model_attr='goods_id')
shop_id = indexes.IntegerField(model_attr='shop_id')
spu_name = indexes.CharField(model_attr='spu_name')
create_time = indexes.CharField(model_attr='create_time')
spu_id = indexes.CharField(model_attr='spu_id')
price = indexes.DecimalField(model_attr='price')
shipping_id = indexes.IntegerField(model_attr='shipping_id')
category_id = indexes.IntegerField(model_attr='category_id')
goods_image = indexes.CharField()
goods_comment = indexes.IntegerField(model_attr='goods_comment')
search_field = indexes.CharField(model_attr='search_field')
param_search = indexes.CharField(model_attr='attr_param_search')
def prepare_goods_image(self, obj):
goods = GoodsGoods.objects.filter(goods_id=obj.goods_id).first()
if not goods:
return goods
return goods.goods_thumb_image
def get_model(self):
return GoodsSpu
def index_queryset(self, using=None):
# 将不符合条件的店铺商品过滤掉
shop = SellerBaseInfo.objects.filter(shop_close=1).using('shop')
shop_list = []
for r in shop:
shop_list.append(r.shop_id)
return self.get_model().objects.all().filter(goods__is_deleted=0,
shop_id__in=shop_list, is_delete=0).order_by('-create_time')
view.py文件
from search_indexes import GoodsIndex
from haystack import connections as haystack_connections
from haystack.query import SearchQuerySet
spu_ids = [1,2,3]
backends = haystack_connections.connections_info.keys()
for using in backends:
backend = haystack_connections[using].get_backend()
def update():
index = GoodsIndex()
qs = index.build_queryset()
for spu_id in spu_ids:
# 筛选出需要更新的索引文档
current_qs = qs.filter(spu_id=spu_id).all()
# 更新索引信息
current_qs = list(current_qs)
# 解释一下:更新index索引中的current_qs文档信息
max_pk = backend.update(index, current_qs, commit=True)
def remove():
# 筛选出需要删除的数据
index_pks = SearchQuerySet().filter(spu_id__in=spu_ids)
# 获取到es中的索引
index_pks = index_pks.values_list('pk','id')
for pk,rec_id in index_pks:
backend.remove(rec_id, commit=True)