当前位置: 首页 > 知识库问答 >
问题:

合流模式注册表-AvroProducer-连接错误

李良策
2023-03-14

我试图使用Confluent_Kafka的AvroProducer类生成Avro格式的消息。Kafka和Schema-Registry在同一个网络中作为3个节点的集群运行。

value_schema = loads("""{"doc": "Messages to be written.",
        "namespace": "schemas.avro",
        "type": "record",
        "name": "kafkagwo",
        "fields": [
            {"name": "timestamp", "type": "string"},
            {"name": "message", "type": "string"}
        ]}""")
key_schema = loads('{"type": "string"}')
p = AvroProducer({'bootstrap.servers': 'broker1,broker2,broker3',
                 'schema.registry.url': 'http://broker1:8081'})
value = {'timestamp': timestamp, 'message': message}
p.produce(topic = 'topic-1', partition=0,
          key=str('key_0'),
          value=value, callback=delivery_report, 
          value_schema = value_schema, key_schema = key_schema)

我得到的是

ConnectionError: HTTPConnectionPool(host='broker1', port=8081): Max retries exceeded with url: /subjects/topic-1-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000165C1522D88>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))

我没有使用Docker容器。集群由3个独立的VM组成,其中安装和运行Kafka和Registry Schema,所以它也不是独立的。Python代码从具有网络访问和防火墙异常的第四个VM执行。事实上,我可以在没有avro和注册表模式的情况下生成和使用消息,所以我不认为这个问题与网络有关,但我愿意接受各种想法。当我尝试使用带有注册表模式的Avro时,我得到了上面的错误。此外,错误指出注册表模式端口8081,所以问题应该与它有关,但我不知道该进一步寻找什么。

共有1个答案

邵伟
2023-03-14

问题是由于schema-registry.properties文件配置造成的。我已经设置了listeners=http://localhost:8081,但是它对应于127.0.0.1,并且不能从另一台机器访问。

我将它更改为listeners=http://0.0.0.0:8081,它就起作用了。

 类似资料:
  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题

  • 我希望即使服务器重新启动,也能保持一个具有固定id的模式。 是否可以在模式注册表中保存模式,以便在服务器崩溃后使用相同的id? 否则,是否有可能在模式注册表服务器启动时用固定的id硬编码一个模式?

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我们计划使用AWS MSK服务来管理Kafka和Schema注册表和Confluent的Kafka Connect服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在EC2中运行模式、注册表和连接器。 根据Confluent团队的说法,如果我们对Kafka使用MSK,他们无法正式支持Confluent模式注册表和Kafka Connect。 那么,任何人都

  • 我正在使用Confluent managed Kafka cluster、模式注册表服务,并试图在Flink作业中处理Debezium消息。作业已配置为使用表 表连接器配置 错误消息 我通过以下方式成功测试了与架构注册表的连接: 错误消息“io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:Una

  • 我使用来自Confluent的Kafka Connect来使用Kafka流并以拼花格式写入HDFS。我正在1个节点中使用架构注册表服务,它运行良好。现在我想将模式注册表分发到集群模式以处理故障转移。关于如何实现这一点的任何链接或片段都将非常有用。