我有一个logstash输入设置为
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
}
}
我需要将主题提供给Elasticsearch中的两个不同的索引。任何人都可以帮助我如何为此类任务设置输出。目前,我只能设置
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
}
}
我需要在同一elasticsearch例如两个指标说index1
和index2
,这将在未来对信息供给topic1
和topic2
首先,您需要添加decorate_events
到kafka
输入中才能知道消息来自哪个主题
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
decorate_events => true
}
}
然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索引名称。为此,您需要添加
filter {
if [kafka][topic] == "topic1" {
mutate {
add_field => {"[@metadata][index]" => "index1"}
}
} else {
mutate {
add_field => {"[@metadata][index]" => "index2"}
}
}
# remove the field containing the decorations, unless you want them to land into ES
mutate {
remove_field => ["kafka"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index]}"
codec => "json"
document_id => "%{id}"
}
}
然后第二个选择是直接在输出部分执行if / else,就像这样(但是其他kafka
字段将落入ES中):
output {
if [@metadata][kafka][topic] == "topic1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
codec => "json"
document_id => "%{id}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "index2"
codec => "json"
document_id => "%{id}"
}
}
}
我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?
问题内容: 我在同一台Ubuntu服务器上有一个Rails 3应用程序的暂存和生产实例(使用tyre gem)。看来这两个实例都共享相同的elasticsearch索引,这显然不是我想要的。 如何使生产和登台实例使用单独的实例? 问题答案: 您需要覆盖索引名称。假设您要绑定ActiveRecord,它将根据相关模型创建索引名称。您可以使用这样的前缀来调整名称; 然后会创建一个名为的索引,以此类推。
问题内容: 我正在使用hadoop mapreduce,我想计算两个文件。我的第一个Map / Reduce迭代是给我一个文件,其文件具有ID号,如下所示: 我的目标是使用文件中的该ID与另一个文件相关联,并使用三重奏输出另一个:ID,Number,Name,如下所示: 但是我不确定使用Map Reduce是否是最好的方法。例如,使用文件读取器读取第二个输入文件并通过ID获得名称会更好吗?还是可以
Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne