当前位置: 首页 > 知识库问答 >
问题:

使用logstash和jdbc更新复杂嵌套的elasticsearch文档

贺运良
2023-03-14

假设Oracle模式有以下表和列:


    Country
        country_id; (Primary Key)
        country_name;

    Department
        department_id; (Primary Key)
        department_name;
        country_id; (Foreign key to Country:country_id)

    Employee
        employee_id; (Primary Key)
        employee_name;
        department_id; (Foreign key to Department:department_id)

我有我的Elasticsearch文档,其中根元素是一个国家,它包含该国家的所有部门,而这些部门又包含各自部门的所有员工。

因此,文档结构如下所示:


    {
      "mappings": {
        "country": {
          "properties": {
            "country_id": { "type": "string"},
            "country_name": { "type": "string"},        
            "department": {
              "type": "nested",
              "properties": {
                "department_id": { "type": "string"},
                "department_name": { "type": "string"},
                "employee": {
                  "type": "nested",
                  "properties": {
                    "employee_id": { "type": "string"},
                    "employee_name": { "type": "string"}
                  }
                }
              }
            }
          }
        }
      }
    }           

我希望能够在每个表上运行单独的输入jdbc查询,并且每当基表中的数据被添加/更新/删除时,它们应该在elasticsearch文档中创建/更新/删除数据。

这是一个示例问题,实际的表和数据结构更加复杂。所以我不是在寻找仅限于此的解决方案。

有没有办法做到这一点?

谢谢

共有1个答案

鄢修德
2023-03-14

对于第一级,它直接使用聚合过滤器。你需要在它们之间有一个共同的id来引用。

filter {    

  aggregate {
    task_id => "%{id}"

    code => "     
      map['id'] = event.get('id')
      map['department'] ||= []
      map['department'] << event.to_hash.each do |key,value| { key => value } end    
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    
  } 

   if "aggregated" not in [tags] {
    drop {}
  }
}

重要提示:输出操作应该更新

    output {
        elasticsearch {
            action => "update"
             ...
           }
        }

解决级别2的一种方法是查询已编制索引的文档并使用嵌套记录更新它。再次使用聚合过滤器;文档应该有一个公共id,以便您可以查找并插入到正确的文档中。

filter {    
    #get the document from elastic based on id and store it in 'emp'
    elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
            query => "id:%{id}" 
            fields => { "employee" => "emp" }
         }



  aggregate {
    task_id => "%{id}"  
    code => "       
                map['id'] = event.get('id')
                map['employee'] = []
                employeeArr = []
                temp_emp = {}   

                event.to_hash.each do |key,value|                       
                    temp_emp[key] = value
                end     

                #push the objects into an array
                employeeArr.push(temp_emp)

                empArr = event.get('emp')                   

                for emp in empArr
                    emp['employee'] = employeeArr                       
                    map['employee'].push(emp)
                end
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']

  } 

   if "aggregated" not in [tags] {
    drop {}
  } 

}

output {

elasticsearch {
        action => "update"    #important
         ...
        }
 }  

此外,为了调试ruby代码,请在输出中使用以下内容

output{
    stdout { codec => dots }
}
 类似资料:
  • 问题内容: 我想在每个更新时间将一个对象添加到字段中。 例如,我有一个文档: 下次,我想在测试字段中添加一个对象并保存旧对象。结果是: 如何实现? 编辑 我使用脚本: 但是,我得到了例外: 编辑 现在,我想添加一个字段以确保更新或插入对象。例如: 当我更新字段时,当id存在时,我将更新对象。相反,我将插入对象。 问题答案: 我建议尝试这样的脚本,该脚本在参数中带有两个参数。它将检查任何嵌套对象是否

  • 问题内容: 我有大量(〜40000)嵌套的​​JSON对象,我想将它们插入elasticsearch中。 JSON对象的结构如下: 因此,JSON字段(此示例中为地址)可以具有JSON对象数组。 Logstash配置看起来像什么将这样的JSON文件/对象导入elasticsearch?该索引的elasticsearch映射应该看起来像JSON的结构。elasticsearch文档ID应设置为。 问

  • 我想在具有指定 URL 的相应文档中将嵌套的“已爬行”更新为 True。 我对mongodb相当陌生,我似乎无法弄清楚这一点,非常感谢任何帮助。

  • 问题内容: 有什么方法可以使用Logstash和csv文件从ElasticSearch删除文档?我阅读了Logstash文档,却一无所获,并尝试了一些配置,但是使用操作“删除”却没有任何反应 有人尝试过吗?我应该在配置的输入和过滤器部分添加一些特殊的东西吗?我使用文件插件作为输入,使用csv插件作为过滤器。 问题答案: 绝对可以按照您的建议去做,但是如果您使用的是Logstash 1.5,则需要使

  • 问题内容: 我的问题涉及到如何处理AngularJS应用程序中模板的复杂嵌套(也称为partials)。 如您所见,这有可能成为具有许多嵌套模型的相当复杂的应用程序。 该应用程序是单页的,因此它将加载index.html,该索引包含DOM中具有ng-view属性的div元素。 对于第1圈,您看到有一个主导航将相应的模板加载到中ng-view。我通过传递$routeParams给主应用程序模块来实现

  • 问题内容: 我正在使用Java API对Elasticsearch进行CRUD操作。 我有一个带有嵌套字段的类型,我想更新此字段。 这是我对类型的映射: 当然,我的最终用户类型将具有其他参数。 现在,我想将此文档添加到我的嵌套字段中: 我在文档中搜索有关如何更新嵌套文档的信息,但找不到任何东西。例如,我在字符串中具有先前的JSON对象(我们将此字符串称为json)。我尝试了以下代码,但似乎无法正常