当前位置: 首页 > 知识库问答 >
问题:

为什么会出现这样的编译错误:“找不到kstream.consumered的隐式值”,我该如何修复它?

阙繁
2023-03-14

我们有这些依赖性:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

我们使用代码生成器从AVRO模式文件生成Scala案例类。一个这样生成的case类有一个值,作为其字段之一。在AVRO模式中,这是用type=[t1,t2]表示的,因此生成似乎是合理的,即sum类型:可以是t1类型,也可以是t2类型。

问题在于,从主题到案例类(二进制)的反序列化路径缺少什么-

基本上我得到这个错误目前:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

第一个想法是kafka-stream-avro-serde,但可能这个库只确保AVRO Map的Serde[GenericRecer],而不是case类。因此,其他依赖项之一是帮助AVRO GenericRecer将case类映射和返回。我们还有一些手写的代码,可以从模式中生成case类,这似乎可以直接用于喷涂json。

我在想(二进制)

我现在正在尝试创建一个Serde[用户事件]实例。因此,在我的理解中,这将涉及在用户事件和AVRO通用记录之间转换,类似于地图,然后在AVRO记录和二进制之间转换——这可能包括在kafka-stream-avro-serde依赖关系中,就像应该有一个Serde[通用记录]或类似的东西一样。

导入明智的,我们有这个导入含义:


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed

共有3个答案

乜明朗
2023-03-14

对我来说,我必须更好地遵循指示,并添加一个隐式的serde实现。他们在链接中的示例如下所示:

// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde

有关更完整的示例,请参见其avro库的scala测试:

    // Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
    implicit val genericAvroSerde: Serde[GenericRecord] = {
      val gas = new GenericAvroSerde
      val isKeySerde: Boolean = false
      gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
      gas
    }
暴辰龙
2023-03-14

你导入了相应的软件包吗?

import org.apache.kafka.streams.scala.ImplicitConversions._

查阅https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-数字用户线

严宇
2023-03-14

事实上,缺少一个导入。现在它可以编译了。下面是导入:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
 类似资料:
  • 我们有以下依赖关系: 我们使用代码生成器从AVRO模式文件生成Scala case类。一个这样生成的case类的字段之一是一个任一值。在AVRO模式中,这是用type=[t1,t2]表示的,因此生成似乎是体面的,即是一个和类型:可以是t1类型或t2类型。

  • 我需要将12小时的时间转换为24小时的格式。 我现在已经把12小时的时间硬编码了,以使事情更简单。 我的逻辑:输入sting 07:05:45PM提取最后2个字符。如果AM check为前两个字符,则为12。。如果是,则将其设置为00,否则按原样输出,如果PM检查前两位数字是否为12。。如果是,请保持原样,如果不是,则在前2位加上12 总线错误:10是我运行代码得到的

  • 这也是基于我的最后一个问题。 按照本教程,我将项目克隆到我的机器中,并试图让项目正确构建。 在修复我在上一个问题中得到的错误的过程中,我遇到了一个新的错误。 这是我正在尝试修复/编辑的构建脚本的部分 按照如何检查Gradle版本的说明,我检查了我的版本,发现我正在运行Gradle版本2.2.1。基于此,我改变了 到 然而,在更改构建脚本代码并尝试重建我的项目后,我得到了上面提到的错误(下面是完整的

  • 我已经花了一个多小时试图解决这个问题,但我什么都没做。 当我试图使用Maven编译我的项目时,我得到了以下错误: [错误]无法执行目标组织。阿帕奇。专家插件:maven assembly插件:2.2-beta-5:single(默认cli)在GankALane项目上:无法解析mojo org的配置。阿帕奇。专家插件:maven assembly插件:2.2-beta-5:单参数存档:在组织中找不到

  • 为什么会引发主线程上的网络异常?its在异步任务上 } 编辑: 完整代码: logcat:

  • 我正在尝试用Java制作一个与OpenGL(使用LWJGL 2)的窗口。当我尝试运行时,Eclipse BuiltInclassLoader出现了ClassNotFoundException错误。 我期望输出显示一个窗口,这是真正的输出: 线程“main”java.lang.noClassDeffounder中的异常错误:org/lwjgl/lwjglexception在enginetester.