当前位置: 首页 > 知识库问答 >
问题:

Kafka连接S3动态S3文件夹结构创建?

皇甫飞飙
2023-03-14
./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties

可以看到AWS MSK的主题CDC偏移正在消耗。不会抛出任何错误。但是,在AWS S3中,没有为新数据创建文件夹结构,也没有存储JSON数据。

问题

  1. 连接器是否应该在看到主题的第一个JSON数据包时动态创建文件夹结构?
  2. 除了配置awscli凭据、connect.properties和s3-sink.properties之外,是否还需要设置其他设置才能正确连接到S3存储桶?
  3. 关于安装文档的建议比Confluent网站上的独立文档更全面?(以上链接)

S3-sink.properties

name=s3-sink connector.class=io.confluent.connect.s3.s3sinkconnector tasks.max=1 topics=database_schema_topic1,database_schema_topic2,database_schema_topic3 s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880 flush.size=1 storage.class=io.confluent.connect.s3.storage.s3 storage format.class=io.confluent.connect.s3.format.json.jsonformat class=io.confluent.connect.storage.partitioner.defaultpartitioner schema.compatibility=none

共有1个答案

乜嘉悦
2023-03-14

连接器是否应该在看到主题的第一个JSON数据包时动态创建文件夹结构?是的,甚至您可以使用参数“topics.dir”和“path.format”来控制此路径(目录结构)

除了配置awscli凭据、connect.properties和s3-sink.properties之外,是否还需要设置其他设置才能正确连接到S3存储桶?默认情况下,S3连接器将通过环境变量或凭据文件使用Aws凭据(访问id和密钥)。您可以通过修改参数“S3.Credentials.Provider.Class”进行更改。参数默认值为“DefaultAwScredEntialsProviderChain”

关于安装比合流网站上的独立文档更全面的文档的建议?(如上链接)我建议您使用分布式模式,因为它为您的连接集群和在其上运行的连接器提供了高可用性。您可以通过下面的文档来配置分布式模式下的连接群集。https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config

 类似资料:
  • 我正在尝试使用Kafka-connect来消耗Kafka的消息并将它们写入s3拼花文件。所以我写了一个简单的生产者,它用byte[]生成消息 我的Kafka连接配置是: 这是我得到的错误: 原因:java。lang.IllegalArgumentException:Avro架构必须是记录。在org。阿帕奇。拼花地板阿夫罗。AvroSchemaConverter。转换(AvroSchemaConve

  • 我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 我正在将数据写入s3 bucket,并使用pyspark创建parquet文件。我的桶结构如下所示: 子文件夹和表这两个文件夹应该在运行时创建,如果文件夹不存在,如果文件夹存在,则应该在文件夹表中创建拼接文件。 当我在本地计算机上运行pyspark程序时,它会用_$folder$(like)创建额外的文件夹,但是如果在emr上运行相同的程序,它会用_success创建。 是否有办法只在s3中创建文

  • 问题内容: 如何使用botoAmazon s3的库在存储桶下创建文件夹? 我按照手册进行操作,并使用许可权,元数据等创建了密钥,但boto文档中没有任何地方描述如何在存储桶下创建文件夹,或在存储桶下的文件夹下创建文件夹。 问题答案: S3中没有文件夹或目录的概念。您可以创建像这样的文件名,许多S3访问工具都可以像目录结构那样显示文件名,但实际上它只是存储桶中的单个文件。

  • Kafka是否将S3支持从JSON连接到Parquet?感谢使用Kafka Connect S3提供的可用和替代建议