当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams:SerializationException:LongDeserializer接收的数据大小不是8

辛承志
2023-03-14

我有一个小程序可以用阿帕奇Kafka来计算颜色的数量

public class FavouriteColor {


    private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
    private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
    private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";

    private static final String APPLICATION_ID = "favourite-colour-java";


    public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);

        KStream<String, String> usersAndColours = textLines
            .filter((key, value) -> value.contains(","))
            .selectKey((key, value) -> value.split(",")[0].toLowerCase())
            .mapValues(value -> value.split(",")[1].toLowerCase())
            .filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));

        usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
        KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);

        KTable<String, Long> favouriteColours = usersAndColoursTable
            .groupBy((user, colour) -> new KeyValue<>(colour, colour))
            .count(Named.as("CountsByColours"));

        favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.cleanUp();
        streams.start();

        System.out.println(streams);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

创建主题,生产者/消费者开始使用终端:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact



kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic favourite-colour-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer



kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input

我在终端中提供了以下输入:

stephane,blue
john,green
stephane,red
alice,red

我在消费者终端收到错误:

stephane    Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
    at scala.Option.map(Option.scala:242)
    at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
    at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

这里有什么问题?我做了简短的研究,发现了其他人提出的类似问题,但这些解决方案似乎对我不起作用。

共有1个答案

乐正浩博
2023-03-14

您将值反序列化器定义为Long的值,但看起来您的数据是一个字符串

 类似资料:
  • 我正在尝试Kafka Streams。编写一个简单的应用程序,我在其中计算重复的消息。 消息: 等。 我正在尝试通过。用它作为钥匙。然后将其用作值。然后按键分组,查看在每个会话中复制了哪些消息。 这是代码: KTable 主题 常规生产者生成重复。但是,当我使用控制台使用者查看它时,它会崩溃并显示错误。然后,我在控制台使用者上使用 --跳过错误消息标志。现在我看到成千上万的这样的线条 谁能帮帮我这

  • 问题内容: 我正在将SocketServer模块用于TCP服务器。我在此函数遇到一些问题,因为传入的数据包总是具有不同的大小,所以如果我指定(我尝试使用更大的值,并且更小),则它在2或3个请求后卡住,因为数据包长度会变小(我认为),然后服务器卡住直到超时。 如果客户端通过同一源端口发送多个请求,但服务器卡住,则非常感谢您的帮助,谢谢! 问题答案: 网络 总是 不可预测的。TCP使许多这种随机行为为

  • 当我尝试接收大量数据时,它会被切断,我必须按enter键才能获取其余数据。起初,我可以增加一点,但它仍然不会收到所有的。正如您所看到的,我增加了conn.recv()上的缓冲区,但它仍然无法获取所有数据。它会在某一点切断它。我必须在原始输入上按enter键才能接收其余数据。我是否可以一次获取所有数据?这是密码。

  • 我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?

  • 我使用Android应用程序通过TCP套接字与同一局域网上的PC java应用程序进行通信、发送和接收消息。下面是我在android中使用的Asynctask的代码,用于发送消息并从PC接收回复: } 我在onPostExcecute中的祝酒词中显示PC的回复。 Android通过BufferedWriter发送消息,而PC上的java应用程序在BufferedReader中接收消息。 PC在收到

  • 数据应该传递给我的portlet类,但它不会。这是我的类代码: 这个类旨在将数据传递回JSP。方法工作正常,就像在jsp中一样,我可以使用但是,它总是返回“no-param”,这意味着返回null。 因此,似乎正确调用了我的方法,但它没有接收表单数据。错误在哪里,我错了什么? 我将portlet下载到我的本地机器上,部署到本地演示Liferay安装中,它工作了!所以代码应该是可以的,这一定是一些服