当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Streams和依赖Headers中模式引用的Serdes

钱繁
2023-03-14

我正在尝试使用 Kafka 流对 CDC 数据执行 KTable-KTable 外键连接。我将读取的数据是 Avro 格式,但它的序列化方式与其他行业序列化程序/反序列化程序(例如 Confluent 模式注册表)不兼容,因为模式标识符存储在标头中。

当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部使用byte[]serialize(字符串主题,T数据)调用Serializer方法,而不是包装序列化程序ValueAndTimestampSerializer中带有头的方法(即byte[]serialize(String topic,headers headers,T data)。与我合作的Serdes无法处理此问题并引发异常。

第一个问题是,有没有人知道一种方法来恳求 Kafka Streams 在内部使用正确的方法签名调用该方法?

我正在探索解决这个问题的方法,包括编写新的Serdes,用消息本身的模式标识符重新序列化。这可能涉及到将数据重新复制到一个新的主题或使用拦截器。

然而,我知道ValueTransformer可以访问ProcessorContext中的头,我想知道是否有更快的方法使用transformValues()。其思想是首先将该值作为byte[]读取,然后将该值反序列化到转换器中的Avro类(参见下面的示例)。然而,当我这样做时,我得到了一个例外。

StreamsBuilder builder = new StreamsBuilder();
 final KTable<Long, MySpecificClass> myTable = builder.table(
      "my-topic",
       Consumed.with(Serdes.Long(), Serdes.ByteArray())
    )
    .transformValues(MyDeserializerTransformer::new);

 ...

 KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);

 joinResultTable.toStream()...
public class MyDeserializerTransformer implements
    ValueTransformerWithKey<Long, byte[], MySpecificClass> {
  MyAvroDeserializer deserializer;
  ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    deserializer = new MyAvroDeserializer();
    this.context = context;
  }

  @Override
  public MySpecificClass transform(Long key, byte[] value) {
    return deserializer.deserialize(context.topic(), context.headers(), value);
  }

  @Override
  public void close() {

  }
}

当我运行它时,我收到一个ClassCastException。我如何解决这个问题或找到解决方法?我需要使用二级状态存储吗?

"class": "org.apache.kafka.streams.errors.StreamsException",
    "msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.\nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
    "stack": [
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
      "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
      "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
 "class": "java.lang.ClassCastException",
      "msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
      "stack": [
        "org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
        "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
        "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",

共有1个答案

颜文康
2023-03-14

我能够解决这个问题,首先将输入主题作为KStream读取,然后将其转换为具有不同Serde的KTable。作为第二步,State Stores似乎遇到了不使用标头调用序列化器/反序列化器方法签名的问题。

 类似资料:
  • 我想下载使用ivy的工件:解决,但使用使用[conf]属性的模式。所以我在IvyS中定义了以下内容ettings.xml 注意这个模式 现在可以很好地解决依赖关系,但只为配置创建了一个文件夹:默认。没有为ConfGroup1创建文件夹 另外,我知道这可以通过使用ivy:retrieve实现,但我不想使用它,因为它将涉及将工件从ivy缓存复制到ivy:resolve之后的另一个地方,我有价值数十亿字

  • 我有一个maven项目a,它的类路径中有一个/xsd/a.xsd。我还有一个带有/xsd/B.xsd的项目B,它通过目录条目引用/xsd/a.xsd。目录条目如下所示: 注意url中的maven协议。该目录由组织使用。jvnet。jaxb2。maven2:maven-jaxb22-plugin,用于生成带有JAXB注释的Java类,其中包含来自依赖项的片段。 到目前为止一切都很好。 现在我想在Ec

  • 我尝试使ajv使用两个JSON-Schema,一个依赖于另一个。下面是我的模式的一个示例(简化): json 错误:没有带有键或引用“http://json-schema.org/draft-04/schema#”的模式 更新:如果我从types.json中删除“$schema…”,我得到的错误是: MissingReferror:无法从id#中解析引用types.json#/definition

  • 问题内容: Fendy和Glen Best的答案 同样 被我 接受 和尊重,但由于可以接受并给予赏金,因此我选择Fendy的答案。 如果我有 一些代码 有 被多次重复使用 在 许多类 (很少有轻微参数改变是显而易见的)和并发线程,哪种方法去? 必须重用的代码可以是任何理智的东西(在考虑了静态和非静态上下文以及方法制作技术的前提下)。它可以是一种算法,一种执行connect,operate,clos

  • 我无法使用ArchUnit制定以下测试: 我想确保某个包中的所有类只访问应用程序基包之外或某个子包内的类(“或”而不是“xor”)。 我得到的是: 问题是,或条件应该在onlyAccessClassesthat()中。如果一个类同时具有两种类型的访问权限,上述公式将失败,我希望这两种类型都有效。 我怎样才能实现我想要的?谢谢你在这方面的任何帮助...

  • 我试图在Firebase数据库上进行多个顺序和依赖的查询。这就是数据库的样子: 我想检索所有的图书,遍历它们,获取相关的作者名称,在每个图书对象中设置值,并返回一个包含图书对象的可观察数组。