当前位置: 首页 > 知识库问答 >
问题:

AWS MSK Kafka Connect S3接收器连接器

戚均
2023-03-14

我们使用S3接收器连接器从MSK自动气象站的S3桶中接收数据。

我们已经在AWS EKS(Kubernetes)上部署了KafkaS3水槽连接器

当我们启动连接器时,当 S3 存储桶上发生分段上传时出现以下错误。

我们对S3存储桶有策略限制,因为启用了服务器端加密(AWS-KMS),即如果没有KMS密钥,我们无法上传。

下面是我们用于连接器的配置,下面是错误详细信息,供您参考。

好心帮忙

{"name":"kc-s3-nuoAcCountIDs-Sink","connector.class":"io.confluent.connect.s3. S3Sink连接器","主题":"测试主题","tasks.max":"1","s3.bucket.name":"s3-But-name","value.converter.schemas.enable":"false","storage.class":"io.confluent.connect.s3.storage.S3存储","format.class":"io.confluent.connect.s3.format.avro.AvroFormat","partitioner.class":"io.confluent.connect.storage.partitioner.默认分区器","behavior.on.null.values":"忽略","schema.compatibility":"NONE","partition.duration.ms": 3600000,"path.format":"'年'=YYYY/'月'=MM/'日'=dd/'小时'=HH","语言环境":"en-US","时区":"UTC","partition.field.name":"帐户平台","s3.region":"eu-West-2","flush.size": 100000}

ktask . execute(worker sink task . Java:201)\ n \ tat org . Apache . Kafka . connect . runtime . worker task . dorun(worker task . Java:185)\ n \ tat org . Apache . Kafka . connect . runtime . worker task . run(worker task . Java:235)\ n \ tat Java . base/Java . util . concurrent . executors$runnable adapter . call(executors . Java:515)\ \ tat Java . base/Java .

共有1个答案

仲元凯
2023-03-14

在我们的例子中,我们需要传递 S3 连接器的 KMS 密钥设置。

根据官方文档以及上述设置,我们在 S3 连接器配置中使用了以下两个附加设置:

"s3.sse.kms.key.id": "<kms-key-id-here>",
"s3.ssea.name": "aws:kms"

我们现在能够在 s3 存储桶中获取数据。

 类似资料:
  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?