当前位置: 首页 > 知识库问答 >
问题:

Kafka流-使用Protobuf serde获取问题

钱澄邈
2023-03-14

我正在创建一个Kafka Streams应用程序,我的主题数据来自Protobuf。我们可以为此创建Java代码绑定。然而,我正在努力使用正确的serde来使用来自主题的数据。有人能告诉我我做错了什么吗。

以下是我使用的属性定义:

Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

我的Serde课

public class AppSerdes extends Serdes {

    public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
        KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
        KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    private static Map<String, Object> getSerdeConfig() {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        return serdeConfig;
    }
}

这就是我创建KStream和KTable实例的方式:

StreamsBuilder streamBuilder = new StreamsBuilder();
    KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
    KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));

然而,我得到以下错误:

组织。阿帕奇。Kafka。溪流。错误。StreamsException:调用处理器的ClassCastException。处理器的输入类型是否与反序列化类型匹配?检查Serde设置并更改StreamConfig中的默认Serde,或通过方法参数提供正确的Serde。确保处理器可以接受key:java类型的反序列化输入。字符串和值:com.google.protobuf。动态消息。请注意,尽管错误的SERDE是导致错误的常见原因,但强制转换异常可能还有其他原因(例如,在用户代码中)。例如,如果处理器在存储中连接,但不正确地强制转换泛型,则在处理过程中可能会引发类强制转换异常,但原因不会是错误的Serdes。在org。阿帕奇。Kafka。溪流。加工机内部。处理器节点。进程(ProcessorNode.java:185)位于org。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。org上的forwardInternal(ProcessorContextImpl.java:273)。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。在org上转发(ProcessorContextImpl.java:252)。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。在org上转发(ProcessorContextImpl.java:219)。阿帕奇。Kafka。溪流。加工机内部。SourceNode。在org上处理(SourceNode.java:86)。阿帕奇。Kafka。溪流。加工机内部。简化任务。组织上的lambda$process$1(StreamTask.java:703)。阿帕奇。Kafka。溪流。加工机内部。韵律学。streamsmetricsiml。可以在org上获得测量相关性(streamsmetricsiml.java:883)。阿帕奇。Kafka。溪流。加工机内部。简化任务。流程(StreamTask.java:703)位于组织。阿帕奇。Kafka。溪流。加工机内部。任务经理。org上的进程(TaskManager.java:1105)。阿帕奇。Kafka。溪流。加工机内部。流线型。runOnce(StreamThread.java:647)位于org。阿帕奇。Kafka。溪流。加工机内部。流线型。org上的runLoop(StreamThread.java:553)。阿帕奇。Kafka。溪流。加工机内部。流线型。运行(StreamThread.java:512)的原因是:java。lang.ClassCastException:com.google.protobuf。无法将DynamicMessage强制转换为iit。数据中心。政党系统与客户关系管理。v1。CustomerAddressBase$CustomerAddressBaseEntity位于组织。阿帕奇。Kafka。溪流。kstream。内部。KStreamImpl。org上的lambda$internalSelectKey$0(KStreamImpl.java:234)。阿帕奇。Kafka。溪流。kstream。内部。KStreamMap$kstreamapprocessor。在org上处理(KStreamMap.java:41)。阿帕奇。Kafka。溪流。加工机内部。处理器适配器。进程(ProcessorAdapter.java:71)位于org。阿帕奇。Kafka。溪流。加工机内部。处理器节点。org上的lambda$process$2(ProcessorNode.java:181)。阿帕奇。Kafka。溪流。加工机内部。韵律学。streamsmetricsiml。可以在org上获得测量相关性(streamsmetricsiml.java:883)。阿帕奇。Kafka。溪流。加工机内部。处理器节点。进程(ProcessorNode.java:181)。。。还有11个

共有1个答案

费学
2023-03-14

我可以通过将此更改为

Map<String, Object> serdeConfig = getSerdeConfig();

Map<String, String> serdeConfig = getSerdeConfig();

作为关键

 类似资料:
  • 我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。

  • 我对使用kafka streams和spring cloud stream相对较新,在使用窗口聚合功能时遇到了困难。 我想做的是 获取我的初始UserInteractionEvents流,并按userProjectId(字符串)对它们进行分组 创建这些事件的窗口会话,15分钟不活动 将这些窗口会话聚合到自定义会话对象中 然后将这些会话对象转换为另一个自定义UserSession对象 我的代码是:

  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本

  • 大家好,我有一个关于提取器和Kafka流的问题。。。。 在我们的应用程序中,有可能接收到无序事件,因此我喜欢根据负载中的业务日期来排序事件,而不是根据它们放置在主题中的时间点。 为此,我编程了一个定制的时间戳提取器,以便能够从有效负载中提取时间戳。我在这里所说的一切都非常有效,但当我构建这个主题的KTable时,我发现我收到的无序事件(从业务角度来看,它不是最后一个事件,而是在最后收到的)显示为对

  • 我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。