我使用 docker compose 来启动 3 个服务:zookeeper、kafka broker 和 minio-connector
当我在 minio-connector 中使用以下配置从 kafka 消费并将 JSON 格式的记录转储到 minio 时,这三个服务可以成功启动和连接:
<代码>root@e1d1294c6fe6:/opt/bitnami/kafka/bin#./connect单机版。sh/plugins/连接器。properties/plugins/s3 sink.properties
bootstrap.servers=kafka:9092
plugin.path=/plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=202208.minio.connector.test
s3.region=us-east-1
s3.bucket.name=minioUsr
s3.part.size=5242880
flush.size=1
store.url=https://minio.kube.url
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.compatibility=NONE
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
现在,我希望连接器使用记录并以镶木地板格式转储到 minio。Kafka和Zookeeper服务保持不变。我修改了连接器属性和 s3-sink.properties,但连接器无法启动。
bootstrap.servers=kafka:9092
plugin.path=/plugins
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
key.converter.schema.registry.url=https://registry...:1443
value.converter.schema.registry.url=https://registry...:1443
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=202208.minio.connector.test
s3.region=us-east-1
s3.bucket.name=minioUsr
s3.part.size=5242880
flush.size=1
store.url=https://minio.kube.url
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
enhanced.avro.schema.support=true
我的问题是:以上配置,连接器异常启动失败
[2022-08-29 15:21:10,610] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration value.converter: Class
io.confluent.connect.avro.AvroConverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:385)
at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.<init>(StandaloneConfig.java:42)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)
我通过下载 confluentinc-kafka-connect-s3-10.1.0 zip 文件并手动解压缩并将所有 jar 复制到 /plugins/lib 来安装连接器。我发现了以下与镶木地板相关的罐子:
root@e1d1294c6fe6:/opt/bitnami/kafka/bin# ls -1 /plugins/lib/*parquet*
/plugins/lib/parquet-avro-1.11.1.jar
/plugins/lib/parquet-column-1.11.1.jar
/plugins/lib/parquet-common-1.11.1.jar
/plugins/lib/parquet-encoding-1.11.1.jar
/plugins/lib/parquet-format-structures-1.11.1.jar
/plugins/lib/parquet-hadoop-1.11.1.jar
安装中缺少什么?配置是否需要进一步更改?
我必须手动安装avro转换器使用conFluent-Hub
>
下载合流集线器客户端:https://docs.confluent.io/5.5.1/connect/managing/confluent-hub/client.html
使用汇流集线器安装avro转换器,如下所示:Kafka Connect汇流S3接收器连接器:io.confluent.Connect.avro类。找不到AvroConverter
我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有和。使用以下配置,所有这些都按预期工作: 现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(<
我的 hdfs 中有 Parquet 文件。我想将这些镶木地板文件转换为csv格式
我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1
我有一个数据帧,它是由运行特定日期的每日批处理创建的,然后保存在HDFS(Azure Data Lake Gen 2)中。 它是用这样的东西保存的 如您所见,我没有对数据帧进行分区,因为它只包含一个日期。 例如,第一天的第一个文件将存储在文件夹中 交易/2019/08/25 然后第二天,它就会在文件夹里 贸易/2019/08/26 问题是,当所有数据都放好后,日期上的过滤器谓词是否仍会被按下,HD
有没有一种方法可以直接从基于avro模式的parquet文件在Amazon Athena中创建表?模式被编码到文件中,所以我需要自己实际创建DDL看起来很愚蠢。 我看到了这个,还有另一个复制品 但它们与Hive直接相关,这对雅典娜不起作用。理想情况下,我正在寻找一种以编程方式执行此操作的方法,而无需在控制台上定义它。
问题内容: 有没有办法从Java创建镶木地板文件? 我的内存中有数据(java类),我想将其写入一个Parquet文件中,以便以后从apache-drill中读取它。 有没有简单的方法可以做到这一点,例如将数据插入sql表? 得到它了 谢谢您的帮助。 结合答案和此链接,我能够创建一个实木复合地板文件并用钻头将其读回。 问题答案: 不建议使用ParquetWriter的构造函数(1.8.1),但不建