当前位置: 首页 > 知识库问答 >
问题:

将胶水模式注册表与 MSK 连接器结合使用

蓝恩
2023-03-14

我一直在尝试创建一个MSK连接器,并使用Glue schema注册表。配置如下。

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-1
topics.dir=topics/dir
flush.size=200
tasks.max=2
s3.part.size=5242880
timezone=GMT
# value.converter.schema.registry.url=http://someIP:8081
key.converter.schemaName=my-topic-schema
locale=US
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
value.converter.schemaName=my-topic-schema
value.converter=io.confluent.connect.avro.StringConverter
s3.bucket.name=my-bucket
key.converter=io.confluent.connect.avro.StringConverter
# key.converter.schema.registry.url==http://someIP:8081
partition.duration.ms=3600000
schema.compatibility=BACKWARD
topics=osb
value.converter.registry.name=Glue-Schema-Registry
key.converter.registry.name=Glue-Schema-Registry
key.converter.schemas.enable=true
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=true
storage.class=io.confluent.connect.s3.storage.S3Storage
rotate.schedule.interval.ms=0
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=RecordField
timestamp.field=timestamp

首先,我使用运行在EC2上的合流模式注册中心,我在“key/value . converter . schema . registry . URL”字段中添加了EC2的ip,它工作得很好。现在我尝试使用Glue Schema Registry。但是我不知道如何用Glue Schema Registry连接连接器。

共有1个答案

孔睿
2023-03-14

这些类不存在

key.converter=io.confluent.connect.avro.StringConverter
value.converter=io.confluent.connect.avro.StringConverter

StringConverterclassname以org.apache.kafka开头

同样,看起来您已经添加了一堆随机转换器属性,除了 url 之外,这些属性对于字符串或融合 Avro 转换器无效

要使用Glue,您需要使用<code>AWSKafkaAvroConverter</code>,它是这个repo的一部分,

https://github . com/aw slabs/AWS-glue-schema-registry/tree/master/avro-kafkaconnect-converter

并且记录在这里

https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-apache kafka connect的注册表集成

 类似资料:
  • 有可能将融合模式注册与AWS MSK集成吗?如果你以前这样做过,你能提供一些你实现它的方法/博客吗?

  • 我无法将模式注册表连接到MSK。我在一个单独的实例中运行Docker,并启用了到MSK服务的连接。 尝试了confluent docker.io/confluentinc/cp模式注册表中的docker映像:最新。 使用低于以下SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL连接时遇到的错误 还尝试用SCHEMA _ REGISTRY _ KAFKASTORE _

  • 我们计划使用AWS MSK服务来管理Kafka和Schema注册表和Confluent的Kafka Connect服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在EC2中运行模式、注册表和连接器。 根据Confluent团队的说法,如果我们对Kafka使用MSK,他们无法正式支持Confluent模式注册表和Kafka Connect。 那么,任何人都

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我试图使用Confluent_Kafka的AvroProducer类生成Avro格式的消息。Kafka和Schema-Registry在同一个网络中作为3个节点的集群运行。 我得到的是 我没有使用Docker容器。集群由3个独立的VM组成,其中安装和运行Kafka和Registry Schema,所以它也不是独立的。Python代码从具有网络访问和防火墙异常的第四个VM执行。事实上,我可以在没有a

  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题