当前位置: 首页 > 知识库问答 >
问题:

kafka s3连接器到minio,采用镶木地板格式

景元徽
2023-03-14

我使用 docker compose 来启动 3 个服务:zookeeper、kafka broker 和 minio-connector

当我在 minio-connector 中使用以下配置从 kafka 消费并将 JSON 格式的记录转储到 minio 时,这三个服务可以成功启动和连接:

  1. 启动命令:

<代码>root@e1d1294c6fe6:/opt/bitnami/kafka/bin#./connect单机版。sh/plugins/连接器。properties/plugins/s3 sink.properties

    bootstrap.servers=kafka:9092
    plugin.path=/plugins
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets 
    name=s3-sink 
    connector.class=io.confluent.connect.s3.S3SinkConnector    
    tasks.max=1 
    topics=202208.minio.connector.test 
    s3.region=us-east-1   
    s3.bucket.name=minioUsr 
    s3.part.size=5242880 
    flush.size=1    
    store.url=https://minio.kube.url    
    storage.class=io.confluent.connect.s3.storage.S3Storage    
    format.class=io.confluent.connect.s3.format.json.JsonFormat   
    schema.compatibility=NONE 
    schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator    
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner 

现在,我希望连接器使用记录并以镶木地板格式转储到 minio。Kafka和Zookeeper服务保持不变。我修改了连接器属性和 s3-sink.properties,但连接器无法启动。

    bootstrap.servers=kafka:9092
    plugin.path=/plugins
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets
    key.converter=io.confluent.connect.avro.AvroConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=true
    key.converter.schema.registry.url=https://registry...:1443
    value.converter.schema.registry.url=https://registry...:1443
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=202208.minio.connector.test
    s3.region=us-east-1
    s3.bucket.name=minioUsr
    s3.part.size=5242880
    flush.size=1
    store.url=https://minio.kube.url
    storage.class=io.confluent.connect.s3.storage.S3Storage
    schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
    enhanced.avro.schema.support=true

我的问题是:以上配置,连接器异常启动失败

    [2022-08-29 15:21:10,610] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
    org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration value.converter: Class
    io.confluent.connect.avro.AvroConverter could not be found.
            at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728)
            at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
            at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
            at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
            at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
            at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:385)
            at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.<init>(StandaloneConfig.java:42)
            at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)

我通过下载 confluentinc-kafka-connect-s3-10.1.0 zip 文件并手动解压缩并将所有 jar 复制到 /plugins/lib 来安装连接器。我发现了以下与镶木地板相关的罐子:

    root@e1d1294c6fe6:/opt/bitnami/kafka/bin# ls -1 /plugins/lib/*parquet*
    /plugins/lib/parquet-avro-1.11.1.jar
    /plugins/lib/parquet-column-1.11.1.jar
    /plugins/lib/parquet-common-1.11.1.jar
    /plugins/lib/parquet-encoding-1.11.1.jar
    /plugins/lib/parquet-format-structures-1.11.1.jar
    /plugins/lib/parquet-hadoop-1.11.1.jar

安装中缺少什么?配置是否需要进一步更改?

共有1个答案

纪俊良
2023-03-14

我必须手动安装avro转换器使用conFluent-Hub

>

  • 下载合流集线器客户端:https://docs.confluent.io/5.5.1/connect/managing/confluent-hub/client.html

    使用汇流集线器安装avro转换器,如下所示:Kafka Connect汇流S3接收器连接器:io.confluent.Connect.avro类。找不到AvroConverter

  •  类似资料:
    • 我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有和。使用以下配置,所有这些都按预期工作: 现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(<

    • 我的 hdfs 中有 Parquet 文件。我想将这些镶木地板文件转换为csv格式

    • 我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1

    • 我有一个数据帧,它是由运行特定日期的每日批处理创建的,然后保存在HDFS(Azure Data Lake Gen 2)中。 它是用这样的东西保存的 如您所见,我没有对数据帧进行分区,因为它只包含一个日期。 例如,第一天的第一个文件将存储在文件夹中 交易/2019/08/25 然后第二天,它就会在文件夹里 贸易/2019/08/26 问题是,当所有数据都放好后,日期上的过滤器谓词是否仍会被按下,HD

    • 有没有一种方法可以直接从基于avro模式的parquet文件在Amazon Athena中创建表?模式被编码到文件中,所以我需要自己实际创建DDL看起来很愚蠢。 我看到了这个,还有另一个复制品 但它们与Hive直接相关,这对雅典娜不起作用。理想情况下,我正在寻找一种以编程方式执行此操作的方法,而无需在控制台上定义它。

    • 问题内容: 有没有办法从Java创建镶木地板文件? 我的内存中有数据(java类),我想将其写入一个Parquet文件中,以便以后从apache-drill中读取它。 有没有简单的方法可以做到这一点,例如将数据插入sql表? 得到它了 谢谢您的帮助。 结合答案和此链接,我能够创建一个实木复合地板文件并用钻头将其读回。 问题答案: 不建议使用ParquetWriter的构造函数(1.8.1),但不建