当前位置: 首页 > 知识库问答 >
问题:

如何运行mongo-kafka连接器作为kafka的源并将其与logstash输入集成以使用elasticsearch作为接收器?

刘辰钊
2023-03-14

我创建了https://github.com/mongodb/mongo-kafka的构建

但是这个运行如何与我正在运行的kafka实例连接。

甚至这个问题听起来多么愚蠢。但似乎没有任何文档可以使其与MongoDB的本地运行的Replicaset一起工作。

如果你有一个好的资源,请引导我走向它。

更新1--

已使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

更新2--如何启用mongodb作为Kafka的源代码?

https://github.com/mongodb/mongo-kafka/blob/master/config/mongoSourceConnector.properties

要用作Kafka配置的文件

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

安装了汇流式和汇流式集线器,但仍不确定与Kafka一起工作的mongo-connector。

更新4-

Zookeeper,Kafka服务器,Kafka connect正在运行

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

使用logstash的以下配置,我可以将数据推入elasticsearch-

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

因此,现在一个MongoSourceConnector.properties保持一个它读取的集合名称,我需要为每个集合运行不同的属性文件的kafka connect。

我的Logstash正在将新数据推入elasticsearch,而不是更新旧数据。此外,它不是根据集合的名称创建索引。想法是这应该能够与我的MongoDB数据库完美地同步。

最终更新-现在一切工作顺利,

  • 为kafka Connect创建了多个属性文件
  • 最新的日志存储实际上根据主题名称创建索引,并相应地更新索引
input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}

共有1个答案

邹锦
2023-03-14

端口8083是Kafka Connect,您可以从connect-*.sh脚本之一开始。

它是独立于代理的,属性不是从kafka-server-start中设置的

 类似资料:
  • 我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?

  • 我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压

  • 我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka

  • 我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需

  • 哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka 描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集 为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下: Dockerfile文件 然后我跑了

  • 我使用Elasticsearch Connector作为Sink将数据插入到Elasticsearch中(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。 但是,我并没有找到任何从Elasticsearch获取数据作为源的连接器。 在Flink pip