使用Elasticsearch第三方包Bboss批量提交新增、修改、删除请求

赵英范
2023-12-01

思路

使用Bboss的Http请求,核心还是ES原生的http请求接口,自己懒得写,直接使用Bboss封装好的

maven依赖

		<dependency>
            <groupId>com.bbossgroups.plugins</groupId>
            <artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
            <version>5.5.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

代码

辅助类

/**
 * 批量提交ES请求辅助类
 * @author yanyl
 * @date 2019-06-03
 */
public class BulkCommitVO {
    //动作
    private String action;
    //索引名称
    private String indexName;
    //type名称
    private String typeName;
    //主键id
    private String id;
    //信息
    private String info;

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getIndexName() {
        return indexName;
    }

    public void setIndexName(String indexName) {
        this.indexName = indexName;
    }

    public String getTypeName() {
        return typeName;
    }

    public void setTypeName(String typeName) {
        this.typeName = typeName;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}

调用方法demo

批量写入(新增、修改)

public Object asynCreateIndex(String platformId, Set<SolrIndexVO> vos) {
        final Set<SolrIndexVO> voset = new HashSet<>();
        final List<BulkCommitVO> bulkCommitVOList = new ArrayList<>();
        for(SolrIndexVO vo : vos) {
            setFieldsValue(vo);//设置站点名称,栏目名称,访问链接,发布状态,排序时间
            if(!StringUtils.isEmpty(vo.getContent())) {
                vo.setAttachContent(getAttachContent(vo.getContent()).toString());//设置附件内容
                vo.setContent(HtmlUtil.getTextFromTHML(vo.getContent()));//设置文章内容
            }else {
                Object content = getArticleContent(vo.getId());
                if(content != null) {
                    vo.setAttachContent(getAttachContent(content.toString()).toString());//设置附件内容
                    vo.setContent(HtmlUtil.getTextFromTHML(content.toString()));//设置文章内容
                }
            }
            if(null != vo.getSiteId()) {
                voset.add(vo);
                BulkCommitVO bulkCommitVO = new BulkCommitVO();
                bulkCommitVO.setAction("index");
                if(!AppUtil.isEmpty(platformId)){
                    bulkCommitVO.setIndexName(platformId);
                }else {
                    bulkCommitVO.setIndexName(vo.getPlatformCode());
                }
                bulkCommitVO.setTypeName(vo.getSiteId().toString());
                bulkCommitVO.setId(vo.getId());
                bulkCommitVO.setInfo(JSON.toJSONStringWithDateFormat(vo,"yyyy-MM-dd HH:mm:ss",
                        SerializerFeature.WriteDateUseDateFormat));
                bulkCommitVOList.add(bulkCommitVO);
            }
        }
        taskExecutor.execute(() -> {
            logger.info("========启动异步线程创建索引========");
            try {
                ResultVO resultVO = (ResultVO) BbossUtil.bulkIndex(bulkCommitVOList);
                if(resultVO.getStatus() == 1){
                    logger.info("========索引创建完成========");
                }else {
                    //保存索引创建失败记录
                    esErrService.saveEsCreateErrInfos(new ArrayList<>(voset));
                }
            }catch (Exception e){
                e.printStackTrace();
                //保存索引创建失败记录
                esErrService.saveEsCreateErrInfos(new ArrayList<>(voset));
            }
        });
        return new ResultVO();
    }

批量删除demo

    /**
     * 批量删除索引
     * @param data
     */
    private static void deletePublicIndex(List<?> data){
        if(data != null && data.size() > 0){
            List<BulkCommitVO> bulkCommitVOList = new ArrayList<>();
            String platformCode = "hefei_gova";
            for (Object o : data) {
                PublicContentVO eo = (PublicContentVO) o;
                //构造提交数据结构
                BulkCommitVO bulkCommitVO = new BulkCommitVO();
                bulkCommitVO.setAction("delete");
                bulkCommitVO.setIndexName(platformCode);
                bulkCommitVO.setTypeName(eo.getSiteId() + "");
                bulkCommitVO.setId(eo.getContentId() + "");
                bulkCommitVOList.add(bulkCommitVO);
            }
            BbossUtil.bulkIndex(commitVOList);
        }
    }

底层调用Bboss

	/**
     * 批量创建/更新/删除索引
     * @param bulkCommitVOList
     * @return
     */
    public static Object bulkIndex(List<BulkCommitVO> bulkCommitVOList){
        //创建es客户端
        ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
        if(bulkCommitVOList.size()>0 && clientUtil.existIndice(bulkCommitVOList.get(0).getIndexName())){
            StringBuffer requestStr = new StringBuffer();
            for(int i=0;i<bulkCommitVOList.size();i++){
                requestStr.append("{ \""+bulkCommitVOList.get(i).getAction()+"\" : { \"_index\" : \""
                        +bulkCommitVOList.get(i).getIndexName()+"\", \"_type\" : \""+bulkCommitVOList.get(i).getTypeName()+"\", \"_id\" : \""
                        +bulkCommitVOList.get(i).getId()+"\" } }\n");
                if(!bulkCommitVOList.get(i).getAction().equals("delete")){
                    requestStr.append(bulkCommitVOList.get(i).getInfo()).append("\n");
                }
            }
            if(bulkCommitVOList.get(0).getAction().contains("delete")){
                System.out.println("requestStr=\n"+requestStr.toString());
            }
            String bulkStr = clientUtil.executeHttp("/_bulk",requestStr.toString(),"post");
            esLog.info(bulkStr);
            return new ResultVO(bulkStr);
        }
        return new ResultVO();
    }
 类似资料: