当前位置: 首页 > 知识库问答 >
问题:

如何在spring batch中使用MongoItemReader的聚合查询

鄂曦之
2023-03-14

在setQuery()中,我必须使用聚合查询而不是基本查询。这可能吗?请建议我怎么做?我的聚合查询已经就绪,但不确定如何在spring批处理中使用它

public ItemReader<ProfileCollection> searchMongoItemReader() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        MongoItemReader<MyCollection> mongoItemReader = new MongoItemReader<>();
        mongoItemReader.setTemplate(myMongoTemplate);
        mongoItemReader.setCollection(myMongoCollection);

        mongoItemReader.setQuery(" Some Simple Query - Basic");

        mongoItemReader.setTargetType(MyCollection.class);
        Map<String, Sort.Direction> sort = new HashMap<>();
        sort.put("field4", Sort.Direction.ASC);
        mongoItemReader.setSort(sort);
        return mongoItemReader;

    }

共有2个答案

童化
2023-03-14

为了能够在作业中使用聚合,利用Spring批处理的所有功能,您必须创建一个自定义的ItemReader。扩展AbstractPaginatedDateItemReader我们可以使用可分页操作中的所有元素。这里有一个简单的自定义类:

public class CustomAggreagationPaginatedItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
    private MongoOperations template;
    private Class<? extends T> type;
    private Sort sort;
    private String collection;

    public CustomAggreagationPaginatedItemReader() {
        super();
        setName(ClassUtils.getShortName(CustomAggreagationPaginatedItemReader.class));
    }

    public void setTemplate(MongoOperations template) {
        this.template = template;
    }

    public void setTargetType(Class<? extends T> type) {
        this.type = type;
    }

    public void setSort(Map<String, Sort.Direction> sorts) {
        this.sort = convertToSort(sorts);
    }

    public void setCollection(String collection) {
        this.collection = collection;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {
        Pageable pageRequest = new PageRequest(page, pageSize, sort);

        BasicDBObject cursor = new BasicDBObject();
        cursor.append("batchSize", 100);

        SkipOperation skipOperation = skip(Long.valueOf(pageRequest.getPageNumber()) * Long.valueOf(pageRequest.getPageSize()));

        Aggregation aggregation = newAggregation(
                //Include here all your aggreationOperations,
                skipOperation,
                limit(pageRequest.getPageSize())
            ).withOptions(newAggregationOptions().cursor(cursor).build());

        return (Iterator<T>) template.aggregate(aggregation, collection, type).iterator();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.state(template != null, "An implementation of MongoOperations is required.");
        Assert.state(type != null, "A type to convert the input into is required.");
        Assert.state(collection != null, "A collection is required.");
    }

    private String replacePlaceholders(String input, List<Object> values) {
        Matcher matcher = PLACEHOLDER.matcher(input);
        String result = input;

        while (matcher.find()) {
            String group = matcher.group();
            int index = Integer.parseInt(matcher.group(1));
            result = result.replace(group, getParameterWithIndex(values, index));
        }

        return result;
    }

    private String getParameterWithIndex(List<Object> values, int index) {
        return JSON.serialize(values.get(index));
    }

    private Sort convertToSort(Map<String, Sort.Direction> sorts) {
        List<Sort.Order> sortValues = new ArrayList<Sort.Order>();

        for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
            sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
        }

        return new Sort(sortValues);
    }
}

如果你仔细观察,你会发现它是使用Spring框架中的MongoItemReader创建的,你可以在org上看到这个类。springframework。一批项目数据MongoItemReader,有一种方法可以创建一个扩展AbstractPaginatedDaitaItemReader的全新类,如果你看一看“doPageRead”方法,你应该能够看到它只使用MongoTemplate的find操作,因此不可能在其中使用聚合操作。

以下是它应该如何使用它我们的CustomReader:

@Bean
public ItemReader<YourDataClass> reader(MongoTemplate mongoTemplate) {
    CustomAggreagationPaginatedItemReader<YourDataClass> customAggreagationPaginatedItemReader = new CustomAggreagationPaginatedItemReader<>();

    Map<String, Direction> sort = new HashMap<String, Direction>();
    sort.put("id", Direction.ASC);

    customAggreagationPaginatedItemReader.setTemplate(mongoTemplate);
    customAggreagationPaginatedItemReader.setCollection("collectionName");
    customAggreagationPaginatedItemReader.setTargetType(YourDataClass.class);
    customAggreagationPaginatedItemReader.setSort(sort);

    return customAggreagationPaginatedItemReader;
}

正如您可能注意到的,您还需要一个MongoTemplate实例,下面是它的工作原理:

@Bean
public MongoTemplate mongoTemplate(MongoDbFactory mongoDbFactory) {
    return new MongoTemplate(mongoDbFactory);
}

其中,MongoDbFactory是spring framework的一个自动连接对象。

希望这足以帮助你。

葛炯
2023-03-14

扩展MongoItemReader,并为doPageRead()方法提供自己的实现。通过这种方式,您将获得完整的分页支持,阅读文档将是一个步骤的一部分。

public class CustomMongoItemReader<T, O> extends MongoItemReader<T> {
private MongoTemplate template;
private Class<? extends T> inputType;
private Class<O> outputType
private MatchOperation match;
private ProjectionOperation projection;
private String collection;

@Override
protected Iterator<T> doPageRead() {
    Pageable page = PageRequest.of(page, pageSize) //page and page size are coming from the class that MongoItemReader extends
    Aggregation agg = newAggregation(match, projection, skip(page.getPageNumber() * page.getPageSize()), limit(page.getPageSize()));
    return (Iterator<T>) template.aggregate(agg, collection, outputType).iterator();

}
}

以及其他的获取者和设置者以及其他方法。请看一下MongoItemReader的源代码。我还从中删除了查询支持。你也可以用同样的方法从MongoItemReader复制粘贴它。Sort也一样。

在你有读者的课堂上,你会做这样的事情:

public MongoItemReader<T> reader() {
    CustomMongoItemReader reader = new CustomMongoItemReader();
    reader.setTemplate(mongoTemplate);
    reader.setName("abc");
    reader.setTargetType(input.class);
    reader.setOutputType(output.class);
    reader.setCollection(myMongoCollection);
    reader.setMatch(Aggregation.match(new Criteria()....)));
    reader.setProjection(Aggregation.project("..","..");
    return reader;
}
 类似资料:
  • 问题内容: 我正在尝试使用内部,而不返回匹配的文件。 谁能告诉我哪里出了问题或正确的语法? 我什至尝试更换 问题答案: 就像您链接到的文档中所说的那样,执行此操作的两种方法是: 要么 但是我也尝试了您的第二次尝试,并且效果很好。 顺便说一句,在这种情况下,正则表达式选项没有意义,因为无论如何您只需要一个匹配项。请注意,它甚至没有在文档中列出。

  • 我不熟悉Mongo中的聚合查询,并且一直在努力产生我想要的输出。我有以下聚合查询: 返回以下结果: 如何修改聚合查询,以便只返回2个文档而不是3个文档?将两个“ABC-123”结果合并为一个结果,并使用带有“bu”和“count”字段的新计数数组,即。 非常感谢

  • 问题内容: 我有一个非常庞大的查询,其最简单的形式如下所示: 我需要再添加一个条件,该条件可以让我获得每个代表的应用程序日期不为空的用户数(例如:rep 1具有3个用户的应用程序日期已填写),并将其分配给类别(由于3个用户,rep是某个状态类别)。看起来像这样: 但是,如果我只是将其添加到select语句中,则所有代表将变为status1,因为sum()是在所有使用申请日期的顾问程序上完成的: 您

  • 我有一套文件。每个文档有两个字段—“代码”和“状态”。我的mongodb集合包含以下文档: 我想按每个代码的状态查找计数。我想要的输出如下所示: 如何使用spring data mongodb实现这一点?我对mongodb很陌生。 更新我已成功编写mongodb查询。这是: 有人能帮助您在spring data mongodb中编写此查询吗?

  • 问题内容: 这是我第一次在Java中使用Mongo,并且此聚合查询存在一些问题。我可以在Mongo for Spring中执行一些简单的查询,并在我的Repository接口中扩展注解。知道在Spring-Data中进行长时间聚合时采用哪种方法会很有帮助。 问题答案: 您可以实现AggregationOperation 并编写自定义聚合操作查询,然后用于执行您在mongo shell中执行的任何m

  • 我是Spring Data MongoDB的新手,我正在尝试用Spring Data MongoDBJava实现聚合查询。我尝试过从这个问题中搜索,并使用进行搜索,但仍然没有结果。 我的数据格式: 我的查询: 这是我在Javascript后端使用的查询,我可以用Mongoose很容易地做到这一点。然而,我对它的Java实现有一些困难。 当我尝试运行此程序时,会出现以下错误: 当我删除从组聚合中,我