当前位置: 首页 > 知识库问答 >
问题:

Kafka-Connect s3源连接器配置问题

东方权
2023-03-14

我已经使用 kafka-connect s3 接收器连接器将一些来自主题(例如 my.topic)的 avro 消息上传到 Amazon s3 存储桶,例如 s3-bucket。接收器连接器的配置如下所示:

{
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "key.converter": "org.apache.kafka.connect.converters.LongConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schemaregistry:8099",
        "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
        "tasks.max": "1",
        "topics": "my.topic",
        "s3.region": "eu-west-2",
        "s3.bucket.name": "s3-bucket",
        "flush.size": "5",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "schema.compatibility": "NONE",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}

这按预期工作,所有消息都是相同的记录,具有相同的架构版本,我将5写入主题并在我的存储桶中看到一个带有路径的s3对象

/topics/my.topic/partition=0/my.topic+0+0000000000.avro

现在我想将这些存储的消息放到另一个空主题上。我使用以下配置启动 s3 源连接器:

{
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1,
        "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
        "s3.region": "eu-west-2",
        "s3.bucket.name": "s3-bucket",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "transforms": "AddPrefix",
        "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex": ".*",
        "transforms.AddPrefix.replacement": "recovery_$0"
}

当我查看kafka-connect(在docker容器中运行)生成的日志时,它看起来很高兴,没有错误,并且它正确地标识了我的存储桶以及其中的目录路径

/topics/my.topic/partition=0/

但是,它不会检测内部文件,也不会向预期的< code>recovery_my.topic主题写入任何内容。它重复记录

kafka-connect         | [2020-07-05 15:31:46,311] INFO PartitionCheckingTask - Checking if Partitions have changed. (io.confluent.connect.cloud.storage.source.util.PartitionCheckingTask)
kafka-connect         | [2020-07-05 15:31:47,963] INFO WorkerSourceTask{id=tx-s3-restore-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
kafka-connect         | [2020-07-05 15:31:47,964] INFO WorkerSourceTask{id=tx-s3-restore-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
kafka-connect         | [2020-07-05 15:31:50,483] INFO AvroDataConfig values: 
kafka-connect         |     schemas.cache.config = 50
kafka-connect         |     enhanced.avro.schema.support = false
kafka-connect         |     connect.meta.data = true
kafka-connect         |  (io.confluent.connect.avro.AvroDataConfig)
kafka-connect         | [2020-07-05 15:31:50,483] INFO AvroDataConfig values: 
kafka-connect         |     schemas.cache.config = 50
kafka-connect         |     enhanced.avro.schema.support = false
kafka-connect         |     connect.meta.data = true
kafka-connect         |  (io.confluent.connect.avro.AvroDataConfig)
kafka-connect         | [2020-07-05 15:31:50,537] INFO AvroDataConfig values: 
kafka-connect         |     schemas.cache.config = 50
kafka-connect         |     enhanced.avro.schema.support = false
kafka-connect         |     connect.meta.data = true
kafka-connect         |  (io.confluent.connect.avro.AvroDataConfig)
kafka-connect         | [2020-07-05 15:31:50,589] INFO No new files ready after scan task assigned folders (io.confluent.connect.cloud.storage.source.StorageSourceTask)

这向我表明它出于某种原因忽略了该文件?这是从日志中提取的完整 s3 源连接器配置

kafka-connect         | [2020-07-05 15:10:49,427] INFO S3SourceConnectorConfig values: 
kafka-connect         |     behavior.on.error = fail
kafka-connect         |     confluent.license = 
kafka-connect         |     confluent.topic = _confluent-command
kafka-connect         |     confluent.topic.bootstrap.servers = [kafka:9092]
kafka-connect         |     confluent.topic.replication.factor = 1
kafka-connect         |     directory.delim = /
kafka-connect         |     filename.regex = (.+)\+(\d+)\+.+$
kafka-connect         |     folders = [topics/my.topic/partition=0/]
kafka-connect         |     format.bytearray.extension = .bin
kafka-connect         |     format.bytearray.separator = 
kafka-connect         |     format.class = class io.confluent.connect.s3.format.avro.AvroFormat
kafka-connect         |     partition.field.name = []
kafka-connect         |     partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
kafka-connect         |     path.format = 
kafka-connect         |     record.batch.max.size = 200
kafka-connect         |     s3.bucket.name = s3-bucket
kafka-connect         |     s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
kafka-connect         |     s3.http.send.expect.continue = true
kafka-connect         |     s3.part.retries = 3
kafka-connect         |     s3.poll.interval.ms = 60000
kafka-connect         |     s3.proxy.password = [hidden]
kafka-connect         |     s3.proxy.url = 
kafka-connect         |     s3.proxy.user = null
kafka-connect         |     s3.region = eu-west-2
kafka-connect         |     s3.retry.backoff.ms = 200
kafka-connect         |     s3.sse.customer.key = [hidden]
kafka-connect         |     s3.ssea.name = 
kafka-connect         |     s3.wan.mode = false
kafka-connect         |     schema.cache.size = 50
kafka-connect         |     store.url = null
kafka-connect         |     topics.dir = topics
kafka-connect         |  (io.confluent.connect.s3.source.S3SourceConnectorConfig)
kafka-connect         | [2020-07-05 15:10:49,428] INFO [Producer clientId=connector-producer-tx-s3-restore-0] Cluster ID: nlQYzBVYRbWozKk54-Qx_A (org.apache.kafka.clients.Metadata)
kafka-connect         | [2020-07-05 15:10:49,432] INFO AvroDataConfig values: 
kafka-connect         |     schemas.cache.config = 50
kafka-connect         |     enhanced.avro.schema.support = false
kafka-connect         |     connect.meta.data = true
kafka-connect         |  (io.confluent.connect.avro.AvroDataConfig)
kafka-connect         | [2020-07-05 15:10:49,434] INFO Starting source connector task with assigned folders [topics/my.topic/partition=0/] using partitioner io.confluent.connect.storage.partitioner.DefaultPartitioner (io.confluent.connect.cloud.storage.source.StorageSourceTask)

如果有人知道为什么我的文件被忽略,我将不胜感激。

共有1个答案

越琦
2023-03-14

由于汇流s3源连接器不是开源的,它需要许可证,因此您需要在源连接器配置中添加汇流许可证,试用期为30天:

"confluent.license": ""

我尝试了我的用例,它正在工作。

 类似资料:
  • 我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 我一直在测试kafka连接。但是对于每个连接器,我都必须去阅读连接器留档以了解连接器所需的配置。就我阅读kafka连接API留档而言,我已经看到API以获取连接器相关数据。 -返回Kafka Connect集群中安装的连接器插件列表。请注意,API仅检查处理请求的工作人员上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加了新的连接器罐。 根据配置定义验证提供的配置值。此

  • 我在 AWS S3 中备份了以下文件,这些文件由 Kafka 连接接收器连接器备份: 当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息: 我的源配置如下所示: 我应该做什么改变才能让密钥和消息一起存储在Kafka中?

  • 我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压

  • 我让RabbitMQ在CloudFoundry中运行,并尝试从本地运行的配置服务器进行连接,下面是在应用程序中配置的内容。yml文件 抛出以下启动异常 下面是pom.xml的依赖关系 我可以使用应用程序中提供的信息连接到控制台。yml但不确定为什么会抛出TimeoutException,任何输入都会非常有用,