当前位置: 首页 > 知识库问答 >
问题:

KafkaAvroDeserializer的Kafka流“consumed.with()”

漆雕嘉平
2023-03-14

我需要从KafkaAvroDeserializer而不是标准的kafka反序列化器消耗的主题创建一个流。这是因为它将被发送到汇流JDBC接收器连接器(不支持标准序列化器/反序列化器)中使用的主题。在创建主题时,我对key和value都使用了KafkaAvroSerializer。

我的原始代码(在更改为密钥使用Kafka Avro序列化器之前)是:

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));

注意:上面的Serdes.String不会正确反序列化,因为密钥是使用KafKaavroSerializer序列化的。那么,也许有另一种形式的代码,它允许我构建一个流,而不需要设置关键字serdes(因此它默认为配置中的内容),我可以只设置值serde(uploadSerde)?

如果不是,有人能告诉我如何将“serdes.string()”标准deserizlaizer更改为kafkaavrodeserializer吗?例如。

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));

在我的使用者中,我正在设置正确的默认反序列化器:

streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

如果使用表单(并允许在我的消费者中指定的默认值,即KafkaAvro):

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);

我得到以下信息:

2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error:    (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at     org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at    org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....

多谢了。

共有1个答案

容俊豪
2023-03-14

键和值的逻辑是相同的。因此,您可以以相同的方式处理这两种情况。

您的困惑是在配置中设置使用者反序列化器。请注意,这些配置被忽略(出于内部原因)。不能直接配置使用者的反序列化器。你总是需要用Serdes。因此,如果要为使用者设置默认的反序列化程序,则需要在配置中指定默认的Serde。

因此,我创建了一个围绕KafkaAvroSerializer和KafkaAvroDeserializer的包装器,该包装器实例化了它们,然后将包装器用于consumed.with中的键参数

完全正确。也可以在配置中将此Serde设置为默认值。

我本以为使用KafKaavroSerialize的字符串键从主题创建流是一种常见的用例

对此不确定。如果是纯字符串,我假设人们可能直接使用StringDeserializer,而不是将字符串包装为Avro(不确定)。还要注意,在处理Avro时,建议使用模式注册表。Confluent的模式注册表附带相应的Avro Serdes:https://github.com/confluentinc/schema-registry/(免责声明:我是Confluent的雇员。)

 类似资料:
  • 在Spring Boot应用程序中,我试图配置Kafka流。用简单的Kafka主题,一切都很好,但我无法得到工作SpringKafka流。 这是我的配置: 我想创建一个基于主题的流。应用一个简单的转换并将此流中的消息发送到test主题。 我向发送以下消息,其中是我自己的复杂类型,但是我现在不知道如何将它转换为中的,以便能够在中使用它。 请建议如何使其工作。

  • 本文向大家介绍Kafka流的特点?相关面试题,主要包含被问及Kafka流的特点?时的应答技巧和注意事项,需要的朋友参考一下 答:Kafka流的一些最佳功能是 Kafka Streams具有高度可扩展性和容错性。 Kafka部署到容器,VM,裸机,云。 我们可以说,Kafka流对于小型,中型和大型用例同样可行。 此外,它完全与Kafka安全集成。 编写标准Java应用程序。 完全一次处理语义。 而且

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我期待关于Kakfa主题的消息,一旦我收到消息,我就会发出关于新主题的消息。我使用streams API来实现这一点,它很简单。但是,由于系统不可靠,我可能永远不会接收到,但如果已经接收到消息的(例如),并且在秒内没有记录新消息,我仍然希望发出消息。这在Kafka streams中是可能的吗?还是我需要为它写一个consumer? 如果Kafka Streams有类似于Rx(http://reac

  • 我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?