当前位置: 首页 > 面试题库 >

logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出

汪阳飇
2023-03-14
问题内容

我有一个logstash输入设置为

input {
  kafka {
  bootstrap_servers => "zookeper_address"
  topics => ["topic1","topic2"]
  }
}

我需要将主题提供给Elasticsearch中的两个不同的索引。任何人都可以帮助我如何为此类任务设置输出。目前,我只能设置

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
  }
}

我需要在同一elasticsearch例如两个指标说index1index2,这将在未来对信息供给topic1topic2


问题答案:

首先,您需要添加decorate_eventskafka输入中才能知道消息来自哪个主题

input {
  kafka {
    bootstrap_servers => "zookeper_address"
    topics => ["topic1","topic2"]
    decorate_events => true
  }
}

然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索引名称。为此,您需要添加

filter {
   if [kafka][topic] == "topic1" {
      mutate {
         add_field => {"[@metadata][index]" => "index1"}
      }
   } else {
      mutate {
         add_field => {"[@metadata][index]" => "index2"}
      }
   }
   # remove the field containing the decorations, unless you want them to land into ES
   mutate {
      remove_field => ["kafka"]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}"
    codec => "json"
    document_id => "%{id}"
  }
}

然后第二个选择是直接在输出部分执行if / else,就像这样(但是其他kafka字段将落入ES中):

output {
   if [@metadata][kafka][topic] == "topic1" {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index1"
       codec => "json"
       document_id => "%{id}"
     }
   } else {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index2"
       codec => "json"
       document_id => "%{id}"
     }
   }
}


 类似资料:
  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 问题内容: 我在同一台Ubuntu服务器上有一个Rails 3应用程序的暂存和生产实例(使用tyre gem)。看来这两个实例都共享相同的elasticsearch索引,这显然不是我想要的。 如何使生产和登台实例使用单独的实例? 问题答案: 您需要覆盖索引名称。假设您要绑定ActiveRecord,它将根据相关模型创建索引名称。您可以使用这样的前缀来调整名称; 然后会创建一个名为的索引,以此类推。

  • 问题内容: 我正在使用hadoop mapreduce,我想计算两个文件。我的第一个Map / Reduce迭代是给我一个文件,其文件具有ID号,如下所示: 我的目标是使用文件中的该ID与另一个文件相关联,并使用三重奏输出另一个:ID,Number,Name,如下所示: 但是我不确定使用Map Reduce是否是最好的方法。例如,使用文件读取器读取第二个输入文件并通过ID获得名称会更好吗?还是可以

  • Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne