我创建了https://github.com/mongodb/mongo-kafka的构建
但是这个运行如何与我正在运行的kafka实例连接。
甚至这个问题听起来多么愚蠢。但似乎没有任何文档可以使其与MongoDB
的本地运行的Replicaset
一起工作。
如果你有一个好的资源,请引导我走向它。
更新1--
已使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
更新2--如何启用mongodb作为Kafka的源代码?
https://github.com/mongodb/mongo-kafka/blob/master/config/mongoSourceConnector.properties
要用作Kafka配置的文件
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
安装了汇流式和汇流式集线器,但仍不确定与Kafka一起工作的mongo-connector。
更新4-
Zookeeper,Kafka服务器,Kafka connect正在运行
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
使用logstash的以下配置,我可以将数据推入elasticsearch-
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
因此,现在一个MongoSourceConnector.properties保持一个它读取的集合名称,我需要为每个集合运行不同的属性文件的kafka connect。
我的Logstash正在将新数据推入elasticsearch,而不是更新旧数据。此外,它不是根据集合的名称创建索引。想法是这应该能够与我的MongoDB数据库完美地同步。
最终更新-现在一切工作顺利,
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
端口8083是Kafka Connect,您可以从connect-*.sh
脚本之一开始。
它是独立于代理的,属性不是从kafka-server-start
中设置的
我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?
我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压
我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka
我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需
哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka 描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集 为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下: Dockerfile文件 然后我跑了
我使用Elasticsearch Connector作为Sink将数据插入到Elasticsearch中(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。 但是,我并没有找到任何从Elasticsearch获取数据作为源的连接器。 在Flink pip