当前位置: 首页 > 知识库问答 >
问题:

Spark使用模式注册表(avro)读取kafka流的最佳实践?

贝滨海
2023-03-14

Spark是否有任何最佳实践来处理在Avro中使用模式注册表序列化的kafka流?尤其是对于Spark结构化流?

我在https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/src/main/scala/io/scalac/spark/AvroConsumer.scala找到了一个例子。但是我无法加载AvroConverter类。我在mvnrepository.com.中找不到名为io.confluent: kafka-avro-seralizer的工件

共有1个答案

糜帅
2023-03-14

您需要在< code>build.sbt中添加汇合回购:

val repositories = Seq(
  "confluent" at "http://packages.confluent.io/maven/",
  Resolver.sonatypeRepo("public")
)

见:https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/build.sbt

 类似资料:
  • 如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。

  • 我有一个Kafka消费者配置了主题中的模式轮询,我想做的是在当前模式的基础上创建另一个Avro模式,并使用它水合数据,基本上我不需要50%的信息,需要编写一些逻辑来更改几个字段。这只是一个例子 从stream返回的事件相当复杂,所以我将一个较小的CustomObj建模为. avsc文件,并将其编译成java。当尝试使用CustomObj运行代码时,我想做的就是使用一个事件,然后将其反序列化为一个更

  • 我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数: 我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机: 谢谢你能给我带来的任何帮助。 视CG

  • 我使用schema registry为所有带有Kafka Streams的应用程序创建模式注册表。我们的一个流进行聚合,我想对聚合对象使用schema registry,如下所示: 但在向schema注册表添加schema时,我们需要按主题名定义schema。在流上聚合的情况下,这是不可能的,因为主题名称是由流拓扑生成的。 问题是是否有某种方法可以为任何主题创建模式,以便任何流或任何其他解决方案都

  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我使用的是Azure HDInsight的托管Apache Kafka解决方案,因为不幸的是Azure上没有托管汇流Kafka解决方案。是否可以运行汇合模式注册表并将其连接到HDInsight Apache Kafka集群的代理? 我希望只在单个VM上安装模式注册表,然后使用schema-registry.properties文件中的这一行,将其指向HDInsight集群的代理列表: kafkas