当前位置: 首页 > 知识库问答 >
问题:

在Kafka流API中获取类强制转换异常

太叔京
2023-03-14

我以json字符串的形式生成输入数据。

对于主题-myinput

{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
     :"myClass","id":112553,"Scope":"198S"}

我的班级是这样的:

public class App {
    static public class CountryMessage {

        public String time;
        public String Scope;
        public String id;
        public String oclass;
        public String name; 
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");


        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);

        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");

        KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

        KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();

        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
        //settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return settings;
    }
}

我得到下面类铸造异常:

线程“countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5A138FFADB2-StreamThread-1”组织中的异常。阿帕奇。Kafka。溪流。错误。异常:流程中捕获到异常。taskId=0_0,processor=KSTREAM-SOURCE-0000000000,topic=vanitopic,partition=0,offset=2036 at org。阿帕奇。Kafka。溪流。加工机内部。简化任务。流程(StreamTask.java:203)位于org。阿帕奇。Kafka。溪流。加工机内部。流线型。org上的processand标点符号(StreamThread.java:679)。阿帕奇。Kafka。溪流。加工机内部。流线型。org上的runLoop(StreamThread.java:557)。阿帕奇。Kafka。溪流。加工机内部。流线型。运行(StreamThread.java:527)由:org引起。阿帕奇。Kafka。溪流。错误。StreamsException:序列化程序(key:org.apache.kafka.common.serialization.ByteArraySerializer/value:org.apache.kafka.common.serialization.ByteArraySerializer)与实际的键或值类型(key-type:java.lang.String/value-type:com.cisco.streams.countries.App$CountryMessage)不兼容。更改StreamConfig中的默认Serdes或通过方法参数提供正确的Serdes。在org。阿帕奇。Kafka。溪流。加工机内部。新节点。在org上处理(SinkNode.java:91)。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。在org上转发(ProcessorContextImpl.java:82)。阿帕奇。Kafka。溪流。kstream。内部。KStreamFilter$KStreamFilterProcessor。进程(KStreamFilter.java:43)位于org。阿帕奇。Kafka。溪流。加工机内部。ProcessorNode$1。在org上运行(ProcessorNode.java:47)。阿帕奇。Kafka。溪流。加工机内部。streamsmetricsiml。org上的measureLatencyNs(streamsmetricsiml.java:187)。阿帕奇。Kafka。溪流。加工机内部。处理器节点。org上的html" target="_blank">进程(ProcessorNode.java:133)。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。在org上转发(ProcessorContextImpl.java:82)。阿帕奇。Kafka。溪流。kstream。内部。KStreamMap$kstreamapprocessor。在org上处理(KStreamMap.java:42)。阿帕奇。Kafka。溪流。加工机内部。ProcessorNode$1。在org上运行(ProcessorNode.java:47)。阿帕奇。Kafka。溪流。加工机内部。streamsmetricsiml。org上的measureLatencyNs(streamsmetricsiml.java:187)。阿帕奇。Kafka。溪流。加工机内部。处理器节点。org上的进程(ProcessorNode.java:133)。阿帕奇。Kafka。溪流。加工机内部。ProcessorContextImpl。在org上转发(ProcessorContextImpl.java:82)。阿帕奇。Kafka。溪流。加工机内部。SourceNode。在org上处理(SourceNode.java:80)。阿帕奇。Kafka。溪流。加工机内部。简化任务。流程(StreamTask.java:189)。。。另外3个原因是:java。ClassCastException:java。lang.String不能强制转换为org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88)org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)上的[B]在org。阿帕奇。Kafka。溪流。加工机内部。新节点。进程(SinkNode.java:87)。。。还有16个

我需要帮助了解如何以及在何处应用我创建的自定义SERDE

共有2个答案

狄冠宇
2023-03-14

将序列化程序添加到groupByKey

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));
禹智渊
2023-03-14

在代码中,

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

groupByKey()需要同时设置两个序列化程序,因为这将触发数据重新分区。或者将StringCountryMessage类型的默认插入设置为。

正如我在评论中提到的,每个不使用StreamsConfig中的默认Serdes的操作员都需要设置正确的Serdes。

因此,还需要在count()操作中指定相应的StringLongSerdes:

countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

所有可能需要Serdes的运算符都有适当的重载。只需检查出您正在使用的所有运算符的所有过载。

有关更多详细信息,请查看文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

 类似资料:
  • 例外情况: java.lang.ClassCastException:com.interconnect.library.gcm.util.checkplayServices(util.java:96),com.interconnect.library.gcm.regiseter.handleRegister(regiseter.java:53),com.interconnect.library.g

  • 我模拟了Jsch()类,并在下面的方法中获得了类强制转换异常。 原始方法。 联机获取Mockito异常。 例外情况: java.lang.ClassCastException:com.jcraft.jsch。频道$MockitoMock$1983492043不能转换为com.jcraft.jsch.ChannelSftp 测试用例调用方法。

  • 我正试图通过1将一些遗留代码迁移到log4j2.6。x= 在旧代码中,我有一个扩展org.apache.log4j的类。记录仪。当添加2.6 core/api jar以及桥api jar到我的类路径时,我有一个单元测试由于类转换异常而失败: MyLogger的类签名是 它只适用于log4j1.2,旧的类(org.apache.log4j.Logger)在bridge jar中,由于MyLogger

  • 问题内容: 尝试将结果集强制转换为映射类时,我收到了hibernate类的类强制转换异常…我能够查看返回的结果集中的数据…但是它以Object []的形式返回我可以将Object []设置为List …我可以正确地进行hibernate映射吗?我从查询中获取了正确的数据,但映射不正确… 映射 映射类 参加班 主要 问题答案: 对于测试,我建议您在产生类强制转换异常的语句周围放置一个try-catc

  • 调用AffineTransform: 它驻留在自定义形状类(YingYang)中。 当我进行调用时,当我试图从绘图面板或在类本身(如果我将返回类型更改为YingYang)中将它转换回一个YingYang时,我会得到一个类转换异常。 java.lang.ClassCastException:java.awt.Geom.Path2D$Double不能强制转换为Animation.Yingyang 任何

  • 我在我的项目中使用了几个不同的模式。它们每个都被编译到一个单独的jar中,每个都使用一个单独的包,使用xmlbean ant任务。我似乎只能成功地解析类路径中第一个模式jar的xml(使用.方法),否则我会得到一个,如此错误中所述。如果我更改jar顺序,不同的模式将能够成功解析,并且将被抛出用于不同的类。 我已经做了一些调试,我得出的结论是包的结构可能是负责任的。由于我的架构没有命名空间,因此我构