当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Stream Kafka Streams和Avro输入/输出,nativeEncoding/de解码=false

靳高明
2023-03-14

我们正在使用Avro输入/输出记录通过Spring Cloud Stream功能支持测试Kafka Streams的使用,但设置nativeEncoding=falsenativeDecoding=false,以便在执行Avro转换的地方使用自定义的MessageConverter

默认的 serdes 是键的 StringSerde 和值的 ByteArraySerde

当我们只使用KStream到KStream函数时,一切都没问题,例如:

    @Bean
    public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
      return input -> input
          .flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\\W+")))
          .map((key, value) -> new KeyValue<>(value, value))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
          .count()
          .toStream()
          .map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
    }

但是当我们尝试一个稍微复杂一点的例子,涉及一个像这样的输入KTable:

    @Bean
    public BiFunction<KStream<String, DataRecordAvro>, KTable<String, DataRecordAvro>, KStream<String, DataRecordAvro>> userClicksRegionKTableAvro() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region.getName().toString(), clicks.getCount()))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

DataRecordAvro 类只有两个成员:CharSequence name;长计数;

收到第一条记录时,将引发此异常:

ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.

引发异常的处理器似乎是:

KSTREAM-LEFTJOIN-0000000011:
    states:     [user-regions-avro-STATE-STORE-0000000008]

我们不知道为什么它在这种情况下不起作用。也许 leftJoin 操作将信息保存到内部主题,并且没有考虑 useNativeEncoding/Decoding=false?但为什么 kstream-

这是另一个工作正常的例子(没有输入 Avro 记录,将消费者 useNativeDecoding 保留为默认值 true):

    @Bean
    public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, DataRecordAvro>> userClicksRegionKTable() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

请帮忙!

共有1个答案

仲绍晖
2023-03-14

对于Spring Cloud Stream中的Kafka Streams binder,我们建议使用带有< code>Serde的原生解码/编码,除非您有充分的理由依赖于消息转换方法。迫使您使用消息转换器的用例是什么?在实践中,在Spring Cloud Stream中的Kafka Streams应用程序中使用消息转换器进行序列化会在您的拓扑结构中添加一个额外的层,并使其更深,因此建议使用原生解码/编码。

正如您所注意到的,对于Ktable,绑定器始终使用本机解码-目前,在那里不可能使用消息转换器。当您关闭Ktable绑定上的use NativeDecoding时,绑定器会忽略它并简单地使用默认字节序列。我建议在Ktable绑定上使用默认值,然后在您的应用程序配置中添加以下bean。

@Bean
public Serde< DataRecordAvro> dataRecordAvroSerde() {
   // return Serde
}

这样,绑定器将检测这个bean,并意识到< code>Serde类型与来自函数签名的类型相匹配,然后在这些输入上使用它。

如果您对此应用程序有进一步的问题,请随时分享MCRE。我们可以进一步观察。

 类似资料:
  • 我是Logstash和Avro的初学者。我们正在建立一个系统,logstash作为Kafka队列的制作人。然而,我们遇到了这样一个问题:由Logstash生成的avro序列化事件无法由apache提供的avro工具jar(版本1.8.2)解码。此外,我们注意到Logstash和avro工具的序列化输出有所不同。 我们有以下设置: logstash 5.5版 logstash avro编解码器版本3

  • 输出 用print加上字符串,就可以向屏幕上输出指定的文字。比如输出'hello, world',用代码实现如下: >>> print 'hello, world' print语句也可以跟上多个字符串,用逗号“,”隔开,就可以连成一串输出: >>> print 'The quick brown fox', 'jumps over', 'the lazy dog' The quick brown

  • 输出 用print()在括号中加上字符串,就可以向屏幕上输出指定的文字。比如输出'hello, world',用代码实现如下: >>> print('hello, world') print()函数也可以接受多个字符串,用逗号“,”隔开,就可以连成一串输出: >>> print('The quick brown fox', 'jumps over', 'the lazy dog') The qu

  • 到目前为止,我们讨论的所有示例本质上都是静态的。在本章中,我们将学习如何Haskell与用户动态交互。学习Haskell中使用的不同输入和输出技术。 1. 文件和流 到目前为止,我们已经对程序本身中的所有输入进行了硬编码,在前面学习的内容中都是从静态变量获取输入。本小节中,我们学习如何从外部文件读取和写入。 创建一个文件并命名为abc.txt。接下来,在此文本文件中输入以下一行: 接下来,我们将编

  • Scipy.io包提供了多种功能来解决不同格式的文件(输入和输出)。 其中一些格式是 - Matlab IDL Matrix Market Wave Arff Netcdf等 这里讨论最常用的文件格式 - MATLAB 以下是用于加载和保存文件的函数。 编号 函数 描述 1 加载一个MATLAB文件 2 保存为一个MATLAB文件 3 列出MATLAB文件中的变量 让我们来看看下面的例子。 上述程

  • 我们经常需要与用户交互,要么获取数据,要么提供某种结果。现今的大多数程序都使用一个对话框来让用户提供某种类型的输入。虽然 Python 确实有创建对话框的方法,但是我们可以使用一个简单得多的函数。Python 为我们提供了一个功能,允许我们要求用户输入一些数据,并以字符串的形式返回数据的引用。这个函数叫做 input。 Python 的输入函数接受单个参数,即字符串。这个字符串通常被称为提示符,因