当前位置: 首页 > 知识库问答 >
问题:

无法使用汇合的Elasticsearch接收器连接器将Kafka主题数据转换为结构化JSON

洪梓
2023-03-14

我正在用Kafka建立一个数据管道。数据流程如下:在mongodb中捕获数据更改并将其发送到ElasticSearch。

MongoDB

  • 版本3.6
  • 碎片群集
    null
    null

由于我还在测试,Kafka相关的系统都是在单台服务器上运行的。

>

  • 启动zookeepr

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    

    启动引导服务器

    $ bin/kafka-server-start etc/kafka/server.properties
    
    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.\ 
                          JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8084
    
        $ bin/kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --topic higee.higee.higee --from-beginning | jq
    

    然后,我得到以下结果。

        "after": null,
          "patch": {
            "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
          },
          "source": {
            "version": {
              "string": "0.7.5"
            },
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": {
              "long": -2379508538412995600
            },
            "initsync": {
              "boolean": false
            }
          },
          "op": {
            "string": "u"
          },
          "ts_ms": {
            "long": 1524214412159
          }
        }
    

    然后,如果我去elasticsearch,我会得到以下结果。

        {
            "_index": "higee.higee.higee",
            "_type": "kafka-connect",
            "_id": "higee.higee.higee+0+3",
            "_score": 1,
            "_source": {
              "after": null,
              "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                           "name" : "higee", 
                           "salary" : 100,
                           "origin" : "South Korea"}""",
              "source": {
                "version": "0.7.5",
                "name": "higee",
                "rs": "172.31.50.13",
                "ns": "higee",
                "sec": 1524214412,
                "ord": 1,
                "h": -2379508538412995600,
                "initsync": false
              },
              "op": "u",
              "ts_ms": 1524214412159
            }
          }
    

    我想实现的一个目标是

        {
            "_index": "higee.higee.higee",
            "_type": "kafka-connect",
            "_id": "higee.higee.higee+0+3",
            "_score": 1,
            "_source": {
              "oid" : "5ad97f982a0f383bb638ecac",
              "name" : "higee", 
              "salary" : 100,
              "origin" : "South Korea"
             }"
         }
    
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => json {
          charset => "UTF-8"
        }
      }
    }
    
    filter {
      json {
        source => "message"
      }
     }
    
    output {
      stdout {
        codec => rubydebug
      }
    }
    
    {
    "message" => "H\u0002�\u0001{\"_id\" : \
        {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
         \"name\" : \"higee\", \
         \"salary\" : 101, \
         \"origin\" : \"South Korea\"} \
         \u0002\n0.7.5\nhigee \ 
         \u0018172.31.50.13\u001Ahigee.higee2 \ 
         ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
    "tags" => [[0] "_jsonparsefailure"]
    }
    

    案例2

    >

  • logstash.conf

    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }
    
    filter {
      json {
        source => "message"
      }
    }
    
    output {
      stdout {
        codec => rubydebug
      }
    }
    

    test.avsc

    {
        "namespace": "example",
        "type": "record",
        "name": "Higee",
        "fields": [
          {"name": "_id", "type": "string"},
          {"name": "name", "type": "string"},
          {"name": "salary",  "type": "int"},
          {"name": "origin", "type": "string"}
        ]
     }
    
    An unexpected error occurred! {:error=>#<NoMethodError: 
    undefined method `type_sym' for nil:NilClass>, :backtrace=> 
    ["/home/ec2-user/logstash- 
    6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
    "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
    1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
    user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
    logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
    avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
    vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
    8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
    thread_runner'", "/home/ec2-user/logstash- 
    6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
    8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
    thread_runner'"]}
    

    python客户端

    >

  • 在某些数据操作之后使用不同主题名称的主题和产品,以便elasticsearch接收器连接器可以仅使用来自Python操作的主题的格式良好的消息
  • Kafka库:无法解码消息

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
                 topics='higee.higee.higee',
                 auto_offset_reset='earliest'
               )
    
    for message in consumer:
        message.value.decode('utf-8')
    
    >>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
        invalid continuation byte
    

    confluent_kafka与python 3不兼容

    • MongoDB debezium
    • MongoDB事件平坦化
    • AVRO转换器
    • 序列化debizium事件
    • Debizum教程

    提前道谢。

    一些尝试

        $ cat etc/kafka/connect-mongo-source.properties
        >>> 
        name=mongodb-source-connector
        connector.class=io.debezium.connector.mongodb.MongoDbConnector
        mongodb.hosts=''
        initial.sync.max.threads=1
        tasks.max=1
        mongodb.name=higee
        transforms=unwrap     
        transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope
    
    ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
    org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
        at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
        at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
        at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    $ cat connect-mongo-source.properties
    
        name=mongodb-source-connector
        connector.class=io.debezium.connector.mongodb.MongoDbConnector
        mongodb.hosts=''
        initial.sync.max.threads=1
        tasks.max=1
        mongodb.name=higee
    
    $ cat elasticsearch.properties
    
        name=elasticsearch-sink
        connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
        tasks.max=1
        topics=higee.higee.higee
        key.ignore=true
        connection.url=''
        type.name=kafka-connect
        transforms=unwrap
        transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
    
    ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
    org.bson.BsonInvalidOperationException: Document does not contain key $set
        at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
        at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
        at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    3)更改了test.avsc并运行了logstash。我没有得到任何错误消息,但结果不是我所期望的,因为originsalaryname字段都是空的,尽管它们被赋予了非空值。我甚至能够正确地通过控制台使用者读取数据。

    $ cat test.avsc
    >>>
        {
          "type" : "record",
          "name" : "MongoEvent",
          "namespace" : "higee.higee",
          "fields" : [ {
            "name" : "_id",
            "type" : {
              "type" : "record",
              "name" : "HigeeEvent",
              "fields" : [ {
                "name" : "$oid",
                "type" : "string"
              }, {
                "name" : "salary",
                "type" : "long"
              }, {
                "name" : "origin",
                "type" : "string"
              }, {
                "name" : "name",
                "type" : "string"
              } ]
            }
          } ]
        }
    
    $ cat logstash3.conf
    >>>
        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => avro {
              schema_uri => "./test.avsc"
            }
          }
        }
    
        output {
          stdout {
           codec => rubydebug
          }
        }
    
    $ bin/logstash -f logstash3.conf
    >>>
        {
        "@version" => "1",
        "_id" => {
          "salary" => 0,
          "origin" => "",
          "$oid" => "",
          "name" => ""
        },
        "@timestamp" => 2018-04-25T09:39:07.962Z
        }
    
  • 共有1个答案

    微生耘豪
    2023-03-14

    您必须使用Avro使用者,否则您将得到'UTF-8'codec can't decode byte

    即使这个示例也不会起作用,因为您仍然需要模式注册表来查找模式。

    Confluent的Python客户端的先决条件说它可以与Python3.x一起工作

    1. JSON编解码器无法解码Avro数据。我认为跟在avro输入编解码器后面的json过滤器也不会起作用
    2. 您的Avro模式错误-您缺少$oid来代替_id
    3. “原始AVRO”(包含消息本身中的模式)和Confluent的编码版本(仅包含注册表中的模式ID)之间存在差异。也就是说,Logstash没有与架构注册表集成...至少不需要插件。

    您的AVSC实际上应该如下所示

    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }
    

    但是,Avro不允许任何名称以[A-Za-z_]的正则表达式开头,因此$oid将是一个问题。

    虽然我并不推荐它(实际上也没有尝试过),但一种可能的方法是使用Pipe input插件将JSON编码的Avro数据从Avro控制台消费者获取到Logstash中

    input {
      pipe {
        codec => json
        command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
      }
    }
    

    http://debezium.io/docs/connectors/mongodb/

    我想这也适用于patch值,但我不知道Debezium,真的。

    如果不使用简单的消息转换(SMT),Kafaka就不会解析正在运行的JSON。阅读链接到的文档后,您可能应该将这些内容添加到连接源属性中

    transforms=unwrap
    transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
    

    如果我没记错的话,Connect将只对顶级Avro字段应用Elasticsearch映射,而不是嵌套的字段。

    换句话说,生成的映射遵循这种模式,

    "patch": {
        "string": "...some JSON object string here..."
      },
    

    您实际上需要这样做--可能手动定义您的ES索引

    "patch": {
       "properties": {
          "_id": {
            "properties" {
              "$oid" :  { "type": "text" }, 
              "name" :  { "type": "text" },
              "salary":  { "type": "int"  }, 
              "origin": { "type": "text" }
          },
    
      null
     类似资料:
    • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

    • 我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!

    • 我想知道有没有办法 > 每个架构而不是每个表创建主题。如果启用了每个模式的主题,那么是否可以在表的基础上支持模式演进(使用模式注册表)? 如果每个模式的主题是不可能的,那么有没有关于如何管理100个或数千个主题的指导方针?考虑到表数与主题数之间会有一对一的映射?

    • 我设置了一个Kafka JDBC接收器以将事件发送到PostgreSQL。我编写了这个简单的生产者,它将带有模式(avro)数据的JSON发送到一个主题,如下所示: producer.py(kafka-python) 价值架构: 连接器配置(无主机、密码等) 但我的连接器出现严重故障,有三个错误,我无法找出其中任何一个错误的原因: TL;博士;日志版本 完整日志 有人能帮我理解这些错误和潜在的原因

    • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

    • 我使用自己的自定义Sink插件运行Kafka Connect集群(本地有1个工人Docker Compose)。我想在连接器中使用几个主题:topicA、topicB、topicC,每个主题都有一个分区。 我的连接器启动时的配置子集如下: 使用此配置,我希望Kafka Connect为每个接收器任务分配一个主题,但遗憾的是,这不是我看到的。实践中发生的情况是,为分配了所有主题的每个任务调用Sink