当前位置: 首页 > 知识库问答 >
问题:

S3拼花地板格式的KafkaS3源连接器

麻宾白
2023-03-14

我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有类型的对象。镶木地板.key.parket。使用以下配置,所有这些都按预期工作:

{
    "name": "s3-sink",
    "config": {
      "name": "s3-sink",
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.schema.registry.url": "https://my-schema-registry",
      "value.converter.basic.auth.credentials.source": "USER_INFO",
      "value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
      "store.kafka.keys": true,
      "parquet.codec": "none",
      "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
      "locale": "en-US",
      "s3.bucket.name": "my-bucket-123",
      "s3.region": "eu-west-1",
      "time.interval": "HOURLY",
      "flush.size": 1000,
      "tasks.max": 1,
      "topics.regex": "test-topic.*",
      "confluent.license": "",
      "confluent.topic.bootstrap.servers": "my-bootstrap-server",
      "confluent.topic.replication.factor": 3,
      "confluent.license.topic.replication.factor": 1,
      "confluent.topic.security.protocol": "SASL_SSL",
      "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "confluent.topic.sasl.mechanism": "PLAIN",
      "confluent.topic.ssl.endpoint.identification.algorithm": "https"
    }
  }
}

现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(< code > Confluent Inc/Kafka-connect-S3-Source:1 . 4 . 5 ):

{
    "name": "s3-source",
    "config": {
      "name": "s3-source",
      "dest.kafka.bootstrap.servers": "my-bootstrap-server",
      "dest.topic.replication.factor": 1,
      "dest.kafka.security.protocol": "SASL_SSL",
      "dest.kafka.sasl.mechanism": "PLAIN",
      "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "tasks.max": 1,
      "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "confluent.license": "",
      "confluent.topic.bootstrap.servers": "my-bootstrap-server",
      "confluent.topic.replication.factor": 3,
      "confluent.license.topic.replication.factor": 1,
      "confluent.topic.security.protocol": "SASL_SSL",
      "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
      "confluent.topic.sasl.mechanism": "PLAIN",
      "confluent.topic.ssl.endpoint.identification.algorithm": "https",
      "transforms": "AddPrefix",
      "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.AddPrefix.regex": ".*",
      "transforms.AddPrefix.replacement": "copy_of_$0",
      "s3.region": "eu-west-1",
      "s3.bucket.name": "my-bucket-123"
    }
}

通过使用上述配置,我无法启动S3源连接器。如果我使用上述配置和命令验证配置:

curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate

我在format.class属性中得到以下错误:

"errors":[
               "Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
               "Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
            ]

我开始认为这个S3源代码连接器不支持<code>Parquet</code>格式。我试着用JSON、AVRO和BYTE格式对其进行验证,所有这些都没有问题。

深入研究S3 Source连接器jar文件(1.4.5),我没有找到一个用于Par

Jar文件中的格式

有人知道这里发生了什么吗?有没有其他方法可以将数据从 S3 - Parquet 格式放回我的 Kafka 集群中?

谢谢!

共有1个答案

公冶桐
2023-03-14

从ConFluent的S3源连接器留档:

开箱即用,连接器支持从S3读取Avro和JSON格式的数据。除了带模式的记录,连接器还支持在文本文件中导入不带模式的普通JSON记录,每行一条记录。通常,连接器可以接受提供格式接口实现的任何格式。

因此,这意味着您应该能够添加/plugin实现< code>parquet格式,但它不是内置的

格式的源代码:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java

 类似资料:
  • Kafka是否将S3支持从JSON连接到Parquet?感谢使用Kafka Connect S3提供的可用和替代建议

  • 我正在尝试使用Kafka-connect来消耗Kafka的消息并将它们写入s3拼花文件。所以我写了一个简单的生产者,它用byte[]生成消息 我的Kafka连接配置是: 这是我得到的错误: 原因:java。lang.IllegalArgumentException:Avro架构必须是记录。在org。阿帕奇。拼花地板阿夫罗。AvroSchemaConverter。转换(AvroSchemaConve

  • 我看到Kafka Connect可以以Avro或JSON格式写入S3。但是没有Parket支持。添加这个有多难?

  • 我有以Avro格式存储的Kafka主题。我想使用整个主题(在收到时不会更改任何消息)并将其转换为Parket,直接保存在S3上。 我目前正在这样做,但它要求我每次消费一条来自Kafka的消息,并在本地机器上处理,将其转换为拼花文件,一旦整个主题被消费,拼花文件完全写入,关闭写入过程,然后启动S3多部分文件上传。或《Kafka》中的阿夫罗- 我想做的是《Kafka》中的阿夫罗- 注意事项之一是Kaf

  • 我使用 docker compose 来启动 3 个服务:zookeeper、kafka broker 和 minio-connector 当我在 minio-connector 中使用以下配置从 kafka 消费并将 JSON 格式的记录转储到 minio 时,这三个服务可以成功启动和连接: 启动命令: <代码>root@e1d1294c6fe6:/opt/bitnami/kafka/bin#.