当前位置: 首页 > 知识库问答 >
问题:

Kafka连接->S3拼花文件Bytearley

鲜于谦
2023-03-14

我正在尝试使用Kafka-connect来消耗Kafka的消息并将它们写入s3拼花文件。所以我写了一个简单的生产者,它用byte[]生成消息

Properties propertiesAWS = new Properties();
    propertiesAWS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "myKafka:9092");
    propertiesAWS.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    propertiesAWS.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());


    KafkaProducer<Long, byte[]> producer = new KafkaProducer<Long, byte[]>(propertiesAWS);
    Random rng = new Random();

    for (int i = 0; i < 100; i++) {
        try {
            Thread.sleep(1000);
            Headers headers = new RecordHeaders();
            headers.add(new RecordHeader("header1", "header1".getBytes()));
            headers.add(new RecordHeader("header2", "header2".getBytes()));
            ProducerRecord<Long, byte[]> recordOut = new ProducerRecord<Long, byte[]>
                    ("s3.test.topic", 1, rng.nextLong(), new byte[]{1, 2, 3}, headers);
            producer.send(recordOut);
        } catch (Exception e) {
            System.out.println(e);

        }
    }

我的Kafka连接配置是:

{
"name": "test_2_s3",
"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "aws.access.key.id": "XXXXXXX",
    "aws.secret.access.key": "XXXXXXXX",
    "s3.region": "eu-central-1",
    "flush.size": "5",
    "rotate.schedule.interval.ms": "10000",
    "timezone": "UTC",
    "tasks.max": "1",
    "topics": "s3.test.topic",
    "parquet.codec": "gzip",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "test-phase1",
    "key.converter": "org.apache.kafka.connect.converters.LongConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "behavior.on.null.values": "ignore",
    "store.kafka.headers": "true"
}

这是我得到的错误:

原因:java。lang.IllegalArgumentException:Avro架构必须是记录。在org。阿帕奇。拼花地板阿夫罗。AvroSchemaConverter。转换(AvroSchemaConverter.java:124)

我的错误在哪里?我是否需要使用Avro,即使我只是想写byteArr一些Kafka Headers?如何配置哪个kafka标头写入拼花地板?谢谢

共有1个答案

江宏伟
2023-03-14

ParquetFormat编写器需要具有类型模式的Avro数据:记录,是的。

不过,您可以使用这样的模式。

{
    "namespace": "com.stackoverflow.example",
    "name": "ByteWrapper",
    "type": "record",
    "fields": [{
        "name": "data",
        "type": "bytes"
    }]
}

配置要写入parquet的kafka标头

没有。Kafka标头只是字节,没有指定的序列化格式。

要使用ParquetFormat,您需要更改您的生产者以使用ConFluent的KafkaAvroSerializer,并在编写Parket之前使用Connect中的AvroConverter来反序列化该数据。

如果只想将字节写入S3,请使用ByteArrayFormat

 类似资料:
  • Kafka是否将S3支持从JSON连接到Parquet?感谢使用Kafka Connect S3提供的可用和替代建议

  • 我看到Kafka Connect可以以Avro或JSON格式写入S3。但是没有Parket支持。添加这个有多难?

  • 如果我错了,请纠正我。。拼花文件是自描述的,这意味着它包含正确的模式。 我想使用S3接收器融合连接器(特别是因为它正确处理了S3的精确一次语义)从我们的Kafka中读取JSON记录,然后在s3中创建拼花文件(按事件时间分区)。我们的JSON记录没有嵌入模式。 我知道它还不被支持,但我对拼花地板和AVRO也有一些问题。 由于我们的JSON记录中没有嵌入模式,这意味着连接器任务必须从它自己的JSON字

  • 我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。

  • 我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有和。使用以下配置,所有这些都按预期工作: 现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(<

  • 可以看到AWS MSK的主题CDC偏移正在消耗。不会抛出任何错误。但是,在AWS S3中,没有为新数据创建文件夹结构,也没有存储JSON数据。 问题 连接器是否应该在看到主题的第一个JSON数据包时动态创建文件夹结构? 除了配置awscli凭据、connect.properties和s3-sink.properties之外,是否还需要设置其他设置才能正确连接到S3存储桶? 关于安装文档的建议比Co