我有一个简单的案例类:
case class User(id: String, login: String, key: String)
我正在添加字段“name”
case class User(id: String, login: String, name: String, key: String)
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "key", "type": "string" }
]
}
case class AuthRequest(user: User, session: String)
{
"namespace": "test",
"type": "record",
"name": "AuthRequest",
"fields": [
{ "name": "user", "type": "User" },
{ "name": "session", "type": "string" }
]
}
Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
.map { msg =>
Try {
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
msg.committableOffset.commitScaladsl()
(msg.record.value(), result, msg.record.key())
} match {
case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
log.info(s"listener got $msg -> $a -> $value")
context.parent ! value
case Failure(e) => e.printStackTrace()
}
}
.runWith(Sink.ignore)
java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumers.authRequestListener.$anonfun$new$2(AuthRequestListener.scala:39)在scala.util.try$.apply(Try.scala:209)在ka.stream.impl.fusing.map$$Anon$9.在Akka.stream.impl.fusing.graphinterpush(ops.scala:51)在Akka.stream.impl.fusing.graphinterpush(graphinterparter.scala:519)在Akka.stream.impl.fusing.graphinterpush(graphinterparter.scala:482),在Akka.stream.impl.fusing.graphinterpash事件(graphinterparter.scala:488),在在Akka.stream.impl.fusing.actorGraphGraphGraphGraphGraphGraphGraphGraphGrapperter.Akka$stream$impl$fusing$ActorGraphGrapperInterper$$ProcessEvent(ActorGraphGrapperInterper.scala:563)在Akka.stream.impl.fusing.Acka$stream$impl$fusing$ActorGraphGraphInterperer$$processEvent在Akka.actor.actorcell.receiveMessage(actorcell.scala:588)在Akka.actor.actorcell.invoke(actorcell.scala:557)在Akka.dispatch.mailbox.processMailbox(mailbox.scala:258)在Akka.dispatch.mailbox.run(mailbox.scala:225)在Akka.dispatch.forkjoinask.doexec(mailbox.scala:235)在java:1339)位于akka.dispatch.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)位于Akka.dispatch.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107)
我试图清理构建和无效缓存-我似乎像某种缓存以前版本的模式在一些有帮助请!
您需要使更改向后兼容,使新字段为空,并向其添加默认值。
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": ["null", "string"], "default": null },
{ "name": "key", "type": "string" }
]
}
如果我使用模式版本1序列化一个对象,然后将模式更新为版本2(比如添加一个字段),那么在以后反序列化该对象时是否需要使用模式版本2?理想情况下,我只希望使用模式版本2,并使反序列化对象具有在对象最初序列化后添加到模式中的字段的默认值。 也许一些代码会更好地解释... 架构 1: 方案2: 使用通用非代码生成方法: 导致EOFException。使用会导致AvroTypeException。 我知道如
我们正在使用Flink表API在Flink应用程序中使用一个Kafka主题。 当我们第一次提交应用程序时,我们首先从我们的自定义注册表中读取最新的模式。然后使用Avro模式创建一个Kafka数据流和表。我的数据序列化器的实现与汇合模式注册表的工作方式类似,方法是检查模式ID,然后使用注册表。因此我们可以在运行时应用正确的模式。
如果Kafka主题的Avro模式被用作另一个模式的参考,那么更新该模式的正确方法是什么? 例如,假设我们有两个Kafka主题:一个使用Avro模式用户,另一个使用UserAction。 然后我想给用户添加一个额外的字段——一个“姓氏”,所以它看起来像这样:,空以使此更改兼容。要做到这一点,我可以更改Avro模式文件,使用Maven模式插件重新生成POJO,然后如果我将使用KafkaTemplate
使用以下定义的Avro模式和测试代码,在考虑Avro模式演变以及如何存储Avro数据的第一个版本并随后使用模式的第二个版本检索时,我有几个问题。在我的示例中,表示第一个版本,表示第二个版本,其中我们添加了属性。 < li >有没有办法在Java中将Avro模式和二进制编码数据存储为字节数组?我们希望将Avro对象存储到DynamoDB中,并且希望将Avro数据存储为一个blob,模式存储在它旁边(
我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。
根据Avro模式规范(适用于接头):https://avro.apache.org/docs/current/spec.html 如上所述,Unions使用JSON数组表示。例如,["null","string"]声明一个模式,该模式可以是null或string。 ( 请注意,当为类型为联合的记录字段指定默认值时,默认值的类型必须与联合的第一个元素匹配。 因此,对于包含“null”的联合,通常首先