package proj.zoie.solr;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.store.Directory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.search.QueryParsing;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.UpdateHandler;
import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.api.DocIDMapper;
import proj.zoie.api.Zoie;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieIndexReader;
import proj.zoie.api.indexing.IndexingEventListener;
import proj.zoie.impl.indexing.ZoieSystem;
public class ZoieUpdateHandler extends UpdateHandler {
private static Logger log = Logger.getLogger(ZoieUpdateHandler.class);
private SolrCore _core;
/**
* 间隔多长时间重新打开searcher,默认值是一分钟
*/
private static long intervalTime = 1000 * 60;// 一分钟
// private Directory indexDir;
private Zoie<IndexReader, DocumentWithID> zoie;
/**
* 记录最后一个版本
*/
private String lastVersion;
public ZoieUpdateHandler(SolrCore core) {
super(core);
_core = core;
SolrConfig solrConfig = _core.getSolrConfig();
// try {
// indexDir = _core.getDirectoryFactory().open(solrConfig.getDataDir());
// } catch (Exception e) {
// log.error("", e);
// }
long interval = solrConfig.getInt("solrIndexSearcher.reopen.batchDelay", (int) intervalTime);
intervalTime = interval;
log.info("intervalTime:" + intervalTime + " ,间隔时间重新打开SolrIndexSearcher");
ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
if (zoieHome == null) {
log.error("zoie home is not setup");
throw new RuntimeException("zoie is not setup");
}
zoie = zoieHome.getZoieSystem();
// zoie.start();
if (zoie == null) {
log.error("zoie is not setup");
throw new RuntimeException("zoie is not setup");
}
lastVersion = zoie.getCurrentReaderVersion();
log.info("###########################################################################" + lastVersion);
runOpenSolrIndexSearcher();
log.info("初始化结束");
ZoieSystem _zoie = (ZoieSystem) zoie;
_zoie.addIndexingEventListener(new IndexingEventListener() {
@Override
public void handleIndexingEvent(IndexingEvent evt) {
log.info(" ###########刷数据到硬盘事件 ###########");
}
@Override
public void handleUpdatedDiskVersion(String version) {
log.info("indexingEventListener update version###########");
}
});
log.info(" core getLogId:"+core.getLogId()+",core getStartTime"+core.getStartTime());
}
/**
* 监控是否重新打开searcher
*/
private void runOpenSolrIndexSearcher() {
log.info("准备启动监控重新打开solrIndexSearcher的线程。。");
Thread thread = new Thread() {
public void run() {
try {
log.info("休息10秒。。");
Thread.sleep(10000);
} catch (Exception e) {
log.error("", e);
}
try {
while (true) {
log.info("休息" + intervalTime + " 毫秒。。");
Thread.sleep(intervalTime);
boolean hasChange = readerHasChange();
log.info("reader是否有修改过############################:" + hasChange);
if (hasChange) {
log.info("重新打开solrIndexSearcher");
try {
updateReader(false);
} catch (Exception e) {
log.error("", e);
}
}
log.info("休息");
}
} catch (Exception e) {
log.error("", e);
}
}
};
thread.start();
log.info("启动监听线程");
}
/**
* 检测reader是否有修改,如果有修改重新new searcher
*
* @return
* @throws Exception
*/
private boolean readerHasChange() throws Exception {
String version = zoie.getCurrentReaderVersion();
log.info("###########################################version:" + version);
if (version == null) {
return false;
}
if ((version != null && lastVersion == null) || version.compareTo(lastVersion) > 0) {
lastVersion = version;
return true;
} else {
return false;
}
}
@Override
public int addDoc(AddUpdateCommand cmd) throws IOException {
ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
if (zoieHome == null) {
log.error("zoie home is not setup");
throw new RuntimeException("zoie is not setup");
}
zoie = zoieHome.getZoieSystem();
if (zoie == null) {
log.error("zoie is not setup");
throw new RuntimeException("zoie is not setup");
}
// String id = cmd.getIndexedId(_core.getSchema());
String uid2 = (String) cmd.solrDoc.getFieldValue(idField.getName());
long zoieUid;
try {
zoieUid = Long.parseLong(uid2);
} catch (Exception e) {
throw new IOException("index uid must exist and of type long: " + uid2);
}
Document doc = cmd.doc;
long time = System.currentTimeMillis();
String version = String.valueOf(time);
DataEvent<DocumentWithID> event = new DataEvent<DocumentWithID>(new DocumentWithID(zoieUid, doc), version);
try {
zoie.consume(Arrays.asList(event));
return 1;
} catch (ZoieException e) {
log.error("", e);
throw new IOException(e.toString());
}
}
@Override
public void close() throws IOException {
log.info("关闭zoieUpdateHandler ....close and ZoieSystemHome shutdown");
ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
if (zoieHome != null) {
zoieHome.shutdown();
}
}
private void updateReader(boolean waitForSearcher) throws IOException {
callPostCommitCallbacks();
Future[] waitSearcher = null;
if (waitForSearcher) {
waitSearcher = new Future[1];
}
core.getSearcher(true, false, waitSearcher);
}
@Override
public void commit(CommitUpdateCommand cmd) throws IOException {
if (zoie != null) {
try {
zoie.flushEvents(10000);
} catch (ZoieException e) {
log.error("", e);
}
}
updateReader(cmd.waitSearcher);
}
@Override
public void delete(DeleteUpdateCommand cmd) throws IOException {
String id = cmd.id;
long zoieUid;
try {
zoieUid = Long.parseLong(id);
} catch (Exception e) {
throw new IOException("index uid must exist and of type long: " + id);
}
ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
if (zoieHome == null) {
log.error("zoie home is not setup");
throw new RuntimeException("zoie is not setup");
}
zoie = zoieHome.getZoieSystem();
if (zoie == null) {
log.error("zoie is not setup");
throw new RuntimeException("zoie is not setup");
}
// String version = zoie.getVersion();
long time = System.currentTimeMillis();
String version = String.valueOf(time);
DataEvent<DocumentWithID> event = new DataEvent<DocumentWithID>(new DocumentWithID(zoieUid, true), version);
try {
zoie.consume(Arrays.asList(event));
} catch (ZoieException e) {
log.error(e.getMessage(), e);
throw new IOException(e.toString());
}
}
@Override
public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// log.info("删除操作");
Query q = QueryParsing.parseQuery(cmd.query, schema);
ZoieSystemHome zoieHome = ZoieSystemHome.getInstance(_core);
if (zoieHome == null) {
log.error("zoie home is not setup");
throw new RuntimeException("zoie is not setup");
}
zoie = zoieHome.getZoieSystem();
if (zoie == null) {
log.error("zoie is not setup");
throw new RuntimeException("zoie is not setup");
}
final LongList delList = new LongArrayList();
final int[] count = new int[1];
count[0] = 0;
List<ZoieIndexReader<IndexReader>> readerList = null;
IndexSearcher searcher = null;
try {
readerList = zoie.getIndexReaders();
MultiReader reader = new MultiReader(readerList.toArray(new IndexReader[readerList.size()]), false);
searcher = new IndexSearcher(reader);
searcher.search(q, new Collector() {
ZoieIndexReader<IndexReader> zoieReader = null;
int base = 0;
@Override
public boolean acceptsDocsOutOfOrder() {
return true;
}
@Override
public void collect(int doc) throws IOException {
try {
// long uid = zoieReader.getUID(doc + base);
long uid = zoieReader.getUID(doc);
if (uid != DocIDMapper.NOT_FOUND) {
delList.add(uid);
}
} catch (Exception e) {
count[0] += 1;
}
}
@Override
public void setNextReader(IndexReader reader, int base) throws IOException {
zoieReader = (ZoieIndexReader<IndexReader>) reader;
this.base = base;
}
@Override
public void setScorer(Scorer scorer) throws IOException {
}
});
} catch (Exception e) {
log.error("", e);
} finally {
try {
if (searcher != null) {
searcher.close();
}
} finally {
if (readerList != null) {
zoie.returnIndexReaders(readerList);
}
}
}
log.info("删除条目:" + delList.size() + ",获取失败:" + count[0]);
if (delList.size() > 0) {
ArrayList<DataEvent<DocumentWithID>> eventList = new ArrayList<DataEvent<DocumentWithID>>(delList.size());
for (long val : delList) {
long time = System.currentTimeMillis();
String version = String.valueOf(time);
eventList.add(new DataEvent<DocumentWithID>(new DocumentWithID(val, true), version));
}
try {
zoie.consume(eventList);
} catch (ZoieException e) {
log.error(e.getMessage(), e);
throw new IOException(e.toString());
}
}
}
@Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ZoieUpdateHandler.class + " doesn't support mergeIndexes.");
}
@Override
public void rollback(RollbackUpdateCommand cmd) throws IOException {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ZoieUpdateHandler.class + " doesn't support rollback.");
}
// ///
// SolrInfoMBean stuff: Statistics and Module Info
// ///
public String getName() {
return ZoieUpdateHandler.class.getName();
}
public String getVersion() {
return SolrCore.version;
}
public String getDescription() {
return "Update handler builds on Zoie system";
}
public Category getCategory() {
return Category.UPDATEHANDLER;
}
public String getSourceId() {
return "$Id{1}quot;;
}
public String getSource() {
return "$URL{1}quot;;
}
public URL[] getDocs() {
return null;
}
public NamedList getStatistics() {
NamedList lst = new SimpleOrderedMap();
lst.add("interval.open.searcher", intervalTime);
lst.add("lastversion",lastVersion);
return lst;
}
}