我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有类型的对象。镶木地板
和.key.parket
。使用以下配置,所有这些都按预期工作:
{
"name": "s3-sink",
"config": {
"name": "s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "https://my-schema-registry",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
"store.kafka.keys": true,
"parquet.codec": "none",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"s3.bucket.name": "my-bucket-123",
"s3.region": "eu-west-1",
"time.interval": "HOURLY",
"flush.size": 1000,
"tasks.max": 1,
"topics.regex": "test-topic.*",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https"
}
}
}
现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(< code > Confluent Inc/Kafka-connect-S3-Source:1 . 4 . 5 ):
{
"name": "s3-source",
"config": {
"name": "s3-source",
"dest.kafka.bootstrap.servers": "my-bootstrap-server",
"dest.topic.replication.factor": 1,
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"tasks.max": 1,
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": ".*",
"transforms.AddPrefix.replacement": "copy_of_$0",
"s3.region": "eu-west-1",
"s3.bucket.name": "my-bucket-123"
}
}
通过使用上述配置,我无法启动S3源连接器。如果我使用上述配置和命令验证配置:
curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate
我在format.class
属性中得到以下错误:
"errors":[
"Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
"Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
]
我开始认为这个S3源代码连接器不支持<code>Parquet</code>格式。我试着用JSON、AVRO和BYTE格式对其进行验证,所有这些都没有问题。
深入研究S3 Source连接器jar文件(1.4.5
),我没有找到一个用于Par
Jar文件中的格式
有人知道这里发生了什么吗?有没有其他方法可以将数据从 S3 - Parquet 格式放回我的 Kafka 集群中?
谢谢!
从ConFluent的S3源连接器留档:
开箱即用,连接器支持从S3读取Avro和JSON格式的数据。除了带模式的记录,连接器还支持在文本文件中导入不带模式的普通JSON记录,每行一条记录。通常,连接器可以接受提供格式接口实现的任何格式。
因此,这意味着您应该能够添加/plugin实现< code>parquet格式,但它不是内置的
格式的源代码:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java
Kafka是否将S3支持从JSON连接到Parquet?感谢使用Kafka Connect S3提供的可用和替代建议
我正在尝试使用Kafka-connect来消耗Kafka的消息并将它们写入s3拼花文件。所以我写了一个简单的生产者,它用byte[]生成消息 我的Kafka连接配置是: 这是我得到的错误: 原因:java。lang.IllegalArgumentException:Avro架构必须是记录。在org。阿帕奇。拼花地板阿夫罗。AvroSchemaConverter。转换(AvroSchemaConve
我看到Kafka Connect可以以Avro或JSON格式写入S3。但是没有Parket支持。添加这个有多难?
我有以Avro格式存储的Kafka主题。我想使用整个主题(在收到时不会更改任何消息)并将其转换为Parket,直接保存在S3上。 我目前正在这样做,但它要求我每次消费一条来自Kafka的消息,并在本地机器上处理,将其转换为拼花文件,一旦整个主题被消费,拼花文件完全写入,关闭写入过程,然后启动S3多部分文件上传。或《Kafka》中的阿夫罗- 我想做的是《Kafka》中的阿夫罗- 注意事项之一是Kaf
我使用 docker compose 来启动 3 个服务:zookeeper、kafka broker 和 minio-connector 当我在 minio-connector 中使用以下配置从 kafka 消费并将 JSON 格式的记录转储到 minio 时,这三个服务可以成功启动和连接: 启动命令: <代码>root@e1d1294c6fe6:/opt/bitnami/kafka/bin#.