统计实体店下【上月、本月、前日、今日】的【已下单、已接单、已完成】的订单
实时同步流程:
mysql —>flinkEtl —>MongoDB
mongoDB查询应该返回的数据:
【0528-已下单】:50000单
【0528-已接单】:20000单
【0529-已下单】:30000单
【0529-已接单】:10000单
{
"_id": {
"$oid": "60aeee3a63936082b9abc647"
},
"service_type": 40,
"status": 0,
"order_id": 52152552,
"date_day": "2021-05-24",
"order_create_time": {
"$date": "2021-05-24T00:00:00.000Z"
},
"start_point": {
"type": "Point",
"coordinates": [116.4913, 40.002]
}
}
db.orderStatusLog.insert({status:0,order_id:55552,date_day:"2021-05-26",order_update_time:new ISODate("2021-05-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:10,order_id:55552,date_day:"2021-05-26",order_update_time:new ISODate("2021-05-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:40,order_id:24214,date_day:"2021-04-26",order_update_time:new ISODate("2021-04-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:0,order_id:512125,date_day:"2021-04-26",order_update_time:new ISODate("2021-04-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:10,order_id:125152,date_day:"2021-05-25",order_update_time:new ISODate("2021-05-25"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:10,order_id:125152,date_day:"2021-05-25",order_update_time:new ISODate("2021-05-25"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:40,order_id:24214,date_day:"2021-04-26",order_update_time:new ISODate("2021-04-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:0,order_id:25125,date_day:"2021-04-26",order_update_time:new ISODate("2021-04-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.insert({status:40,order_id:12512,date_day:"2021-04-26",order_update_time:new ISODate("2021-04-26"),start_point:{type:"Point",coordinates:[40.55,40.002]}});
db.orderStatusLog.aggregate([
{
"$geoNear" : {
"near" : { "type" : "Point" , "coordinates" : [ 39.11 , 40.51]} ,
"query" : { "order_create_time" : { "$gte" : new ISODate("2021-04-01") }} ,
"distanceField" : "dist.calculated" ,
"maxDistance" : 100000000.0 ,
"spherical" : true
}},
{
$group:{
_id:{dateDay:"$date_day",status:"$status",orderId:"$order_id"}
}
},
{
$group:{
_id:{dateDay:"$_id.dateDay",status:"$_id.status"},
count:{$sum:1}
}
}
])
{ _id: { dateDay: '2021-04-25', status: 10 }, count: 1 }
{ _id: { dateDay: '2021-04-25', status: 0 }, count: 2 }
{ _id: { dateDay: '2021-05-24', status: 0 }, count: 1 }
{ _id: { dateDay: '2021-05-24', status: 10 }, count: 1 }
{ _id: { dateDay: '2021-04-25', status: 40 }, count: 1 }
{ _id: { dateDay: '2021-05-25', status: 40 }, count: 1 }
{ _id: { dateDay: '2021-04-26', status: 40 }, count: 1 }
{ _id: { dateDay: '2021-05-26', status: 40 }, count: 2 }
{ _id: { dateDay: '2021-05-24', status: 40 }, count: 2 }
// 日期筛选条件,【2021-04-01 00:00:00】——【2021-05-31 23:59:59】
Query<OrderStatusLog> query = datastore.createQuery(getEntityClass())
.field("order_update_time").greaterThanOrEq(bo.getDayStart())
.field("order_update_time").lessThan(bo.getDayEnd());
// 设置筛选条件、实体店坐标、有效距离,"calcDist"是计算订单开始地点到实体店的距离,这边我们只统计数量,没有用到
GeoNear geoNear = GeoNear.builder("calcDist")
.setNear(GeoJson.point(bo.getLat(), bo.getLng()))
.setMaxDistance(Double.parseDouble(String.valueOf(bo.getVisibleDistance())))
.setSpherical(true)
.setQuery(query)
.build();
Iterator<RestaurantOrderMongoVo> resultIterator = datastore.createAggregation(getEntityClass())
// 设置过滤文档条件
.geoNear(geoNear)
// 先分组去重
.group(
Group.id(Group.grouping("dateDay", "date_day")
, Group.grouping("orderStatus", "status")
, Group.grouping("orderId", "order_id"))
)
// 分组统计订单数据
.group(
Group.id(Group.grouping("dateDay", "_id.dateDay"), Group.grouping("orderStatus", "_id.orderStatus"))
, Group.grouping("orderSum", new Accumulator("$sum", 1))
)
.aggregate(RestaurantOrderMongoVo.class);
// 遍历返回结果(返回结果中"_id"存储着分组的字段,需要解析出来)
List<RestaurantOrderSumVo> list = new ArrayList<>();
while (resultIterator.hasNext()) {
RestaurantOrderMongoVo mongoVo = resultIterator.next();
RestaurantOrderMongoVo.Result resultId = JSONObject.parseObject(mongoVo.getResultId(), RestaurantOrderMongoVo.Result.class);
RestaurantOrderSumVo vo = new RestaurantOrderSumVo();
vo.setOrderSum(mongoVo.getOrderSum());
vo.setDateDay(resultId.getDateDay());
vo.setOrderStatus(String.valueOf(resultId.getOrderStatus()));
list.add(vo);
}