当前位置: 首页 > 工具软件 > Zoie > 使用案例 >

zoie-solr插件修改:ZoieUpdateHandler

伯君浩
2023-12-01
package proj.zoie.solr;

import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.store.Directory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.search.QueryParsing;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.UpdateHandler;

import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.api.DocIDMapper;
import proj.zoie.api.Zoie;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieIndexReader;
import proj.zoie.api.indexing.IndexingEventListener;
import proj.zoie.impl.indexing.ZoieSystem;

public class ZoieUpdateHandler extends UpdateHandler {
	private static Logger log = Logger.getLogger(ZoieUpdateHandler.class);

	private SolrCore _core;

	/**
	 * 间隔多长时间重新打开searcher,默认值是一分钟
	 */
	private static long intervalTime = 1000 * 60;// 一分钟

//	private Directory indexDir;

	private Zoie<IndexReader, DocumentWithID> zoie;
	/**
	 * 记录最后一个版本
	 */
	private String lastVersion;

	public ZoieUpdateHandler(SolrCore core) {
		super(core);
		_core = core;
		SolrConfig solrConfig = _core.getSolrConfig();
//		try {
//			indexDir = _core.getDirectoryFactory().open(solrConfig.getDataDir());
//		} catch (Exception e) {
//			log.error("", e);
//		}
		long interval = solrConfig.getInt("solrIndexSearcher.reopen.batchDelay", (int) intervalTime);
		intervalTime = interval;
		log.info("intervalTime:" + intervalTime + "    ,间隔时间重新打开SolrIndexSearcher");
		ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
		if (zoieHome == null) {
			log.error("zoie home is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		zoie = zoieHome.getZoieSystem();
//		zoie.start();
		if (zoie == null) {
			log.error("zoie is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		lastVersion = zoie.getCurrentReaderVersion();
		
		log.info("###########################################################################" + lastVersion);
		runOpenSolrIndexSearcher();
		log.info("初始化结束");

		ZoieSystem _zoie = (ZoieSystem) zoie;
		_zoie.addIndexingEventListener(new IndexingEventListener() {
			@Override
			public void handleIndexingEvent(IndexingEvent evt) {
				log.info(" ###########刷数据到硬盘事件 ###########");
			}

			@Override
			public void handleUpdatedDiskVersion(String version) {
				log.info("indexingEventListener update version###########");
			}
		});
		
		log.info(" core getLogId:"+core.getLogId()+",core getStartTime"+core.getStartTime());
	}

	/**
	 * 监控是否重新打开searcher
	 */
	private void runOpenSolrIndexSearcher() {
		log.info("准备启动监控重新打开solrIndexSearcher的线程。。");
		Thread thread = new Thread() {
			public void run() {
				try {
					log.info("休息10秒。。");
					Thread.sleep(10000);
				} catch (Exception e) {
					log.error("", e);
				}
				try {
					while (true) {
						log.info("休息" + intervalTime + "  毫秒。。");
						Thread.sleep(intervalTime);
						boolean hasChange = readerHasChange();
						log.info("reader是否有修改过############################:" + hasChange);
						if (hasChange) {
							log.info("重新打开solrIndexSearcher");
							try {
								updateReader(false);
							} catch (Exception e) {
								log.error("", e);
							}
						}
						log.info("休息");
					}
				} catch (Exception e) {
					log.error("", e);
				}
			}
		};
		thread.start();
		log.info("启动监听线程");
	}

	/**
	 * 检测reader是否有修改,如果有修改重新new searcher
	 * 
	 * @return
	 * @throws Exception
	 */
	private boolean readerHasChange() throws Exception {
		String version = zoie.getCurrentReaderVersion();
		log.info("###########################################version:" + version);
		if (version == null) {
			return false;
		}
		if ((version != null && lastVersion == null) || version.compareTo(lastVersion) > 0) {
			lastVersion = version;
			return true;
		} else {
			return false;
		}
	}

	@Override
	public int addDoc(AddUpdateCommand cmd) throws IOException {
		
		ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
		if (zoieHome == null) {
			log.error("zoie home is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		zoie = zoieHome.getZoieSystem();
		if (zoie == null) {
			log.error("zoie is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		// String id = cmd.getIndexedId(_core.getSchema());
		String uid2 = (String) cmd.solrDoc.getFieldValue(idField.getName());
		long zoieUid;
		try {
			zoieUid = Long.parseLong(uid2);
		} catch (Exception e) {
			throw new IOException("index uid must exist and of type long: " + uid2);
		}
		Document doc = cmd.doc;
		long time = System.currentTimeMillis();
		String version = String.valueOf(time);
		DataEvent<DocumentWithID> event = new DataEvent<DocumentWithID>(new DocumentWithID(zoieUid, doc), version);
		try {
			zoie.consume(Arrays.asList(event));
			return 1;
		} catch (ZoieException e) {
			log.error("", e);
			throw new IOException(e.toString());
		}
	}

	@Override
	public void close() throws IOException {
		log.info("关闭zoieUpdateHandler ....close and ZoieSystemHome shutdown");
		ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
		if (zoieHome != null) {
			zoieHome.shutdown();
		}
	}

	private void updateReader(boolean waitForSearcher) throws IOException {
		callPostCommitCallbacks();
		Future[] waitSearcher = null;
		if (waitForSearcher) {
			waitSearcher = new Future[1];
		}
		core.getSearcher(true, false, waitSearcher);
	}

	@Override
	public void commit(CommitUpdateCommand cmd) throws IOException {
		if (zoie != null) {
			try {
				zoie.flushEvents(10000);
			} catch (ZoieException e) {
				log.error("", e);
			}
		}
		updateReader(cmd.waitSearcher);
	}

	@Override
	public void delete(DeleteUpdateCommand cmd) throws IOException {
		String id = cmd.id;
		long zoieUid;
		try {
			zoieUid = Long.parseLong(id);
		} catch (Exception e) {
			throw new IOException("index uid must exist and of type long: " + id);
		}
		
		ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
		if (zoieHome == null) {
			log.error("zoie home is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		zoie = zoieHome.getZoieSystem();
		if (zoie == null) {
			log.error("zoie is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		// String version = zoie.getVersion();
		long time = System.currentTimeMillis();
		String version = String.valueOf(time);
		DataEvent<DocumentWithID> event = new DataEvent<DocumentWithID>(new DocumentWithID(zoieUid, true), version);
		try {
			zoie.consume(Arrays.asList(event));
		} catch (ZoieException e) {
			log.error(e.getMessage(), e);
			throw new IOException(e.toString());
		}
	}

	@Override
	public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
//		log.info("删除操作");
		Query q = QueryParsing.parseQuery(cmd.query, schema);
		ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
		if (zoieHome == null) {
			log.error("zoie home is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		zoie = zoieHome.getZoieSystem();
		if (zoie == null) {
			log.error("zoie is not setup");
			throw new RuntimeException("zoie is not setup");
		}
		final LongList delList = new LongArrayList();
		final int[] count = new int[1];
		count[0] = 0;
		List<ZoieIndexReader<IndexReader>> readerList = null;
		IndexSearcher searcher = null;
		try {
			readerList = zoie.getIndexReaders();
			MultiReader reader = new MultiReader(readerList.toArray(new IndexReader[readerList.size()]), false);
			searcher = new IndexSearcher(reader);
			searcher.search(q, new Collector() {
				ZoieIndexReader<IndexReader> zoieReader = null;
				int base = 0;

				@Override
				public boolean acceptsDocsOutOfOrder() {
					return true;
				}

				@Override
				public void collect(int doc) throws IOException {
					try {
						// long uid = zoieReader.getUID(doc + base);
						long uid = zoieReader.getUID(doc);
						if (uid != DocIDMapper.NOT_FOUND) {
							delList.add(uid);
						}
					} catch (Exception e) {
						count[0] += 1;
					}
				}

				@Override
				public void setNextReader(IndexReader reader, int base) throws IOException {
					zoieReader = (ZoieIndexReader<IndexReader>) reader;
					this.base = base;
				}

				@Override
				public void setScorer(Scorer scorer) throws IOException {
				}
			});

		} catch (Exception e) {
			log.error("", e);
		} finally {
			try {
				if (searcher != null) {
					searcher.close();
				}
			} finally {
				if (readerList != null) {
					zoie.returnIndexReaders(readerList);
				}
			}
		}

		log.info("删除条目:" + delList.size() + ",获取失败:" + count[0]);

		if (delList.size() > 0) {
			ArrayList<DataEvent<DocumentWithID>> eventList = new ArrayList<DataEvent<DocumentWithID>>(delList.size());
			for (long val : delList) {
				long time = System.currentTimeMillis();
				String version = String.valueOf(time);
				eventList.add(new DataEvent<DocumentWithID>(new DocumentWithID(val, true), version));
			}
			try {
				zoie.consume(eventList);
			} catch (ZoieException e) {
				log.error(e.getMessage(), e);
				throw new IOException(e.toString());
			}

		}
	}

	@Override
	public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
		throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ZoieUpdateHandler.class + " doesn't support mergeIndexes.");
	}

	@Override
	public void rollback(RollbackUpdateCommand cmd) throws IOException {
		throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ZoieUpdateHandler.class + " doesn't support rollback.");
	}

	// ///
	// SolrInfoMBean stuff: Statistics and Module Info
	// ///

	public String getName() {
		return ZoieUpdateHandler.class.getName();
	}

	public String getVersion() {
		return SolrCore.version;
	}

	public String getDescription() {
		return "Update handler builds on Zoie system";
	}

	public Category getCategory() {
		return Category.UPDATEHANDLER;
	}

	public String getSourceId() {
		return "$Id{1}quot;;
	}

	public String getSource() {
		return "$URL{1}quot;;
	}

	public URL[] getDocs() {
		return null;
	}

	public NamedList getStatistics() {
		NamedList lst = new SimpleOrderedMap();
		lst.add("interval.open.searcher", intervalTime);
		lst.add("lastversion",lastVersion);
		return lst;
	}

}


 类似资料: