当前位置: 首页 > 知识库问答 >
问题:

Kafka连接S3源连接器忽略键

巫马浩言
2023-03-14

我在 AWS S3 中备份了以下文件,这些文件由 Kafka 连接接收器连接器备份:

test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json

当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息:

DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)

我的源配置如下所示:

"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy

我应该做什么改变才能让密钥和消息一起存储在Kafka中?

共有1个答案

包沈义
2023-03-14

我忽略了如何确保在从S3读取数据时考虑到.keys.json来构造Kafka键。

不过,我用来实现同样的事情的一个技巧是简单地依赖ValueToKeySMT转换器:如果键值也是值有效负载的一部分,那么将其注入键值就像在源连接器配置中添加org.apache.kafka.connect.transforms.ValueToKey转换器的实例一样简单。

下面是相关的文档:https://docs . confluent . io/platform/current/connect/transforms/value tokey . html

这里有一篇关于变压器的很棒的博客文章(12篇同样很棒的文章的一部分)https://rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/

在我的具体情况下,ValueToKey的内置行为并不完全符合我的要求,所以我最终编写了我自己的CUomKeyToValue转换器,由我的接收器连接器使用,以确保密钥存储在S3中,然后在源连接器上编写CUomValueToKey以将内容重建到Kafka中。

编写这样的自定义转换器实际上非常简单,它基本上只是在单个 java 文件中编写 20 行左右的代码,然后您可以使用 kafka-connect-maven-plugin 打包。例如,以下是可以作为灵感的内置 ValueToKey 的代码:

https://github . com/Apache/Kafka/blob/28f 013708 FFE 8 e 48 e 46 f 408 c 7 f 570 BF 2c D5 c 54 b 2/connect/transforms/src/main/Java/org/Apache/Kafka/connect/transforms/value tokey . Java/Java/org/Apache/Kafka/connect/transforms/value tokey . Java

我们也可以应用相同的原理来保存/检索Kafka标题。

 类似资料:
  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者

  • 我已经使用 kafka-connect s3 接收器连接器将一些来自主题(例如 )的 avro 消息上传到 Amazon s3 存储桶,例如 。接收器连接器的配置如下所示: 这按预期工作,所有消息都是相同的记录,具有相同的架构版本,我将5写入主题并在我的存储桶中看到一个带有路径的s3对象 现在我想将这些存储的消息放到另一个空主题上。我使用以下配置启动 s3 源连接器: 当我查看kafka-conn

  • 我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?

  • 我们使用S3接收器连接器从MSK自动气象站的S3桶中接收数据。 我们已经在AWS EKS(Kubernetes)上部署了KafkaS3水槽连接器 当我们启动连接器时,当 S3 存储桶上发生分段上传时出现以下错误。 我们对S3存储桶有策略限制,因为启用了服务器端加密(AWS-KMS),即如果没有KMS密钥,我们无法上传。 下面是我们用于连接器的配置,下面是错误详细信息,供您参考。 好心帮忙 {"na

  • 有没有办法通过Kafka Connect S3接收器连接器标记写入S3存储桶的对象。我正在读取来自Kafka的消息,并使用S3接收器连接器将avro文件写入S3存储桶。当文件写入S3存储桶时,我需要标记文件。