当前位置: 首页 > 面试题库 >

ElasticSearch在另一个聚合结果上使用聚合

孟光耀
2023-03-14
问题内容

有一个对话列表,每个对话都有一个消息列表。每个消息都有一个不同的字段和一个action字段。我们需要考虑的是,在对话的第一条消息中使用了动作A,在几条消息中使用了动作A.1之后,过了一会儿A.1.1,依此类推(有一个聊天机器人意图列表)。

将对话的消息动作分组将类似于: A > A > A > A.1 > A > A.1 > A.1.1 ...

问题:

我需要使用ElasticSearch创建一个报告,该报告将返回actions group每次会话的;接下来,我需要对类似的东西进行分组并actions groups添加一个计数;最终将导致Map<actionsGroup, count>as 'A > A.1 > A > A.1 > A.1.1', 3

构建actions groupI需要消除每组重复项;而不是A > A > A > A.1 > A > A.1 > A.1.1我需要拥有A > A.1 > A > A.1 > A.1.1

我开始做的步骤

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

接下来我需要什么:

  1. 我需要将崩溃的结果映射到单个结果中A > A.1 > A > A.1 > A.1.1。我已经看到,在这种情况下,或者aggr有可能在结果中使用脚本,并且有可能创建一个我需要aggr执行的动作列表,但它会对所有消息进行操作,而不仅仅是对我组合的消息进行操作陷入崩溃。是否可以使用aggr内部塌陷或类似的解决方案?
  2. 我需要对A > A.1 > A > A.1 > A.1.1所有折叠的结果values()进行分组,添加一个计数并得出Map<actionsGroup, count>

要么:

  1. conversationId使用字段将对话消息分组aggr(我不知道该怎么做)
  2. 使用脚本来迭代所有值并actions group为每个对话创建。(不确定是否可行)
  3. aggr在所有值上使用另一个,并将重复项分组,返回Map<actionsGroup, count>

更新1: 添加一些其他详细信息

对应:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

对话的样本html" target="_blank">文件:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

预期结果:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

由于我是Elasticsearch的新手,所以每个提示都值得欢迎。


问题答案:

我用scripted_metric弹性的解决了。而且,的index状态已从初始状态更改。

剧本:

{
   "size": 0,
   "aggs": {
        "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": "state.messagesList = new ArrayList();",
                "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                "combine_script": "return state",
                "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
            }
        }
    }
}

格式化脚本(为了提高可读性-使用.ts):

scripted_metric: {
  init_script: 'state.messagesList = new ArrayList();',
  map_script: `
    long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
    Map currentMessage = [
      'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
      'time': currentMessageTime,
      'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
    ];
    state.messagesList.add(currentMessage);`,
  combine_script: 'return state',
  reduce_script: `
    List messages = new ArrayList();
    Map conversationsMap = new HashMap();
    Map intentsMap = new HashMap();
    boolean[] ifElseWorkaround = new boolean[1];

    for (state in states) {
      messages.addAll(state.messagesList);
    }

    messages.stream().forEach(message -> {
      Map existingMessage = conversationsMap.get(message.conversationId);
      if(existingMessage == null || message.time > existingMessage.time) {
        conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
      } else {
        ifElseWorkaround[0] = true;
      }
    });

    conversationsMap.entrySet().forEach(conversation -> {
      if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
        long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
        intentsMap.put(conversation.getValue().intentsPath, intentsCount);
      } else {
        intentsMap.put(conversation.getValue().intentsPath, 1L);
      }
    });

    return intentsMap.entrySet().stream().map(intentPath -> [
      'path': intentPath.getKey().toString(),
      'count': intentPath.getValue()
    ]).collect(Collectors.toSet())`

答案:

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 11,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "intentPathsCountAgg": {
            "value": [
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                },
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                },
                {
                    "smallTalk.greet -> smallTalk.greet2": 1
                }
            ]
        }
    }
}


 类似资料:
  • 问题内容: 想象一下,我有两种记录:一个存储桶和一个项目,其中存储在存储桶中的项目,而存储桶中的项目可能相对较少(通常不超过4个,从不超过10个)。这些记录被压缩为一个(具有更多存储桶信息的项目),并放置在Elasticsearch中。我要解决的任务是通过依赖项属性的过滤查询一次找到500个存储桶(最大),其中包含所有相关项,而我受困于限制/抵消聚合。我该如何执行此类任务?我看到聚合使我可以控制相

  • 我有以下JPA实体(getter、setter和非相关字段省略): 我的目标是使用JPQL或criteriaAPI实现查询,它将返回每天的平均事务量和最大事务量。 产生预期结果的原生SQL查询(MySQL数据库)如下所示: 遗憾的是,不鼓励使用本机 SQL 查询,并且 JPQL 不允许在 where 子句中使用子查询。 提前谢谢你。 附加: 我从以下Spring数据查询开始: 但显然没有用: 我可

  • 我使用聚合从嵌套字段收集数据并卡住了一点 文件示例: ES允许通过rectangle.attributes._id来分组数据,但是有没有办法让一些“其他”桶把没有添加到任何组中的文档放在那里?或者,也许有一种方法可以通过创建查询来为文档创建桶。我认为桶将是完美的,因为我需要使用“其他”文档进行进一步的聚合。或者也许有一些很酷的解决方法 我使用这样的查询进行聚合 然后得到这个结果 这样的结果将是完美

  • 问题内容: 我想过滤出字段“ A”等于“ a”的文档,并且我想同时考虑字段“ A”,当然不包括先前的过滤器。我知道您可以将过滤器“置于查询之外”,以便在不应用该过滤器的情况下获得构面,例如: elasticsearch 单反 这非常好,但是如果我有多个滤镜和构面,每个滤镜和构面应该互相排斥,会发生什么?例: 也就是说,对于方面AI,希望保留除A:a以外的所有过滤器,对于方面B希望保留除B:b以外的

  • 我想过滤掉字段'a'等于'a'的文档,同时我想对字段'a'进行刻面处理,当然不包括前面的过滤器。我知道您可以将筛选器放在查询的“外部”,以便在不应用该筛选器的情况下获得方面,例如: 弹性搜索 索尔尔 也就是说,对于方面A,我希望保留除A:A以外的所有过滤器,对于方面B,我希望保留除B:B以外的所有过滤器,以此类推。最明显的方法是执行n个查询(n个方面中的每一个),但我不想这样做。

  • 我一直在尝试在elasticsearch术语聚合中添加分页。在查询中,我们可以添加分页,如, 这很清楚,但当我想向聚合添加分页时,我读了很多关于它的内容,但找不到任何内容,我的代码如下所示, 是否有任何方法可以使用函数或任何其他建议创建分页?