当前位置: 首页 > 知识库问答 >
问题:

具有外部https连接的kafka架构

董意蕴
2023-03-14

我开始对kafka绝望了。对于一个私人项目,一家公司向我发送了一个kafka流。经过长时间的尝试,我终于设法连接到引导服务器并接收到第一条消息。但没有反序列化。目前数据格式如下:4868fa8

该公司以avro格式发送密钥和值,我也获得了几个模式URL。但是我不能正确地使用它们,这样我就能得到可读的数据。不管我怎么输入,它总是出错。只有当我在没有任何模式的情况下进行检索时,我才能得到如上所述的消息。

模式url是https和外部的,因此我已经尝试为url创建信任库。有人能给我一个提示吗?我还能尝试什么?

bin/kafka-avro-console-consumer --bootstrap-server kafka1.some.url:9093,kafka2.some.url:9093,kafka3.some.url:9093 --topic myTopic --consumer-property security.protocol=SSL --consumer-property ssl.protocol=TLSv1.2 --consumer-property ssl.truststore.location=ssl/myTruststore.jks --consumer-property ssl.truststore.password=xxx --consumer-property  ssl.keystore.location=ssl/keystore.jks --consumer-property ssl.keystore.password=xxx --consumer-property ssl.key.password=xxx --consumer-property schema.registry=https://schema-reg.some.url  --from-beginning

这里出现的第一个问题是,我被告知模式有3个URL。我在程序调用中指定了一个基本url,还有两个,一个用于键,另一个用于值。两者都具有以下格式:

https://schema-reg.some.url/subjects/Key/versions/latest/schema

https://schema-reg.some.url/subjects/Value/versions/latest/schema

看起来像(对于键):

{“type”:“record”,“name”:“WoKey”,“namespace”:“somenameSpace”,“fields”:[{“name”:“someId”,“type”:“string”,“aliases”:[“userId”]},{“name”:“nextId”,“type”:[“null”,“string”],“default”:null}]}

不幸的是,我完全被难住了。我也收到了一个typescipt,但是我想直接在kafka中使用整个东西,然后通过kafka-connect sink将其写入MySQL DB。

如果我用上述设置启动Kafka,我总是会得到一个错误:

错误 运行时的未知错误: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Error retrieve Avro unknown schema for id 423 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:333) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializer.deserializer.java:114) atio.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133) at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92) at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.ConnectException: Connection denyed (Connection deny)

共有1个答案

穆远
2023-03-14

由于错误显示“连接被拒绝”,我认为问题与--consumer-property schema.registry=有关。

正确的选项是——property schema.registry.url=https://...

关于提到的URL,默认主题将在主题名称之后具有-value-key

否则,建议对文件使用< code> - consumer-config,而不需要键入长命令选项。

 类似资料:
  • 使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息: 但是出现错误: Jsonlint说Json是有效的。我在 kafka 配置中保留了 json 。有什么指示吗?

  • 我想把9093暴露在码头集装箱外面。当我如下设置暴露于9093的kafka-0端口和KAFKA_ADVERTISED_LISTENERS时,我无法连接到localhost:9093,如以下docker-compose文件所示。 但是,当我换成 和 我可以连接到Kafka经纪人localhost:9092。 如何将外部端口更改为9093以使应用程序连接?我想设置多个经纪商。

  • 我开始学习JPA,并基于我在SQL Server中测试的以下本机SQL实现了一个使用JPA查询的示例: 根据上面的SQL,我构造了以下JPQL查询: 正如您所看到的,我仍然缺少原始查询中的条件。我的问题是,我怎样才能把它放入我的JPQL中?

  • 我正在尝试调用一个出站的外部网络服务,即https。它在我当地的MuleStudion环境中工作正常。但是当我部署进行测试时。我收到异常 异常堆栈为: 信任Anchors参数必须为非空(java.security.Invalid算法参数异常)java.security.cert.PKIXP参数:200(null) 意外错误:java.security.Invalid算法参数异常:信任锚参数必须为非

  • 我需要从中选择所有行,如果选择位置子句匹配,则从中选择匹配这是我的外部与子查询,但它失败了。有人可以帮忙吗?

  • 我开始玩Quarkus和它的REST客户端。根据文档,应该创建一个Jax-RS带注释的接口,并用@RegisterRestClient进一步注释。 我的问题是,在服务器提供的一个工件中,我已经有了我需要连接的服务的JaxRS接口,我可以直接导入它。有没有办法使用已经创建的外部Jax-RS接口来创建服务?复制粘贴代码来获得一个完美的界面似乎是错误的,因为它已经很好地为我服务了。