当前位置: 首页 > 知识库问答 >
问题:

Kafka流错误:SerializationException:LongDeserializer接收到的数据大小不是8

李胡媚
2023-03-14

我正在尝试Kafka Streams。编写一个简单的应用程序,我在其中计算重复的消息。

消息:

2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2491
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2492

等。

我正在尝试通过会话拆分此类消息:prod xxxx。用它作为钥匙。然后会话:prod xxxx Hello World:xxxx将其用作值。然后按键分组,查看在每个会话中复制了哪些消息。

这是代码:

KStream<String, String> textLines = builder.stream("RegularProducer");
KTable<String, Long> ktable = textLines.map(
    (String key, String value) -> {
        try {
            String[] parts = value.split("::");
            String sessionId = parts[1];
            String message = ((parts[2]).split("=>"))[1];
            message = sessionId+":"+message;
            return new KeyValue<String,String>(sessionId.trim().toLowerCase(), message.trim().toLowerCase());
        } catch (Exception e) {
            return new KeyValue<String,String>("Invalid-Message".trim().toLowerCase(), "Invalid Message".trim().toLowerCase());
        }
    })
    .groupBy((key,value) -> value)
    .count().filter(
            (String key, Long value) -> {
                return value > 1;
            }
    );

ktable.toStream().to("RegularProducerDuplicates", 
Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
topology.describe();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

KTable 主题 常规生产者生成重复。但是,当我使用控制台使用者查看它时,它会崩溃并显示错误。然后,我在控制台使用者上使用 --跳过错误消息标志。现在我看到成千上万的这样的线条

session:prod-111656 : hello world: 994  [2019-02-28 16:25:18,081] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

谁能帮帮我这里出了什么问题?

共有3个答案

贲言
2023-03-14

我也有同样的问题,发现如果我将filter移到toStream之后,就不会产生空值(墓碑)。

祝灼光
2023-03-14

用于值的反序列化程序可能不用于字符串,而将用于长整型。在cli中创建使用者时,请务必指定它。表示“出”

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic name \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --skip-message-on-error \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

在创建使用者时,请在此处检查最后 2 行,注意您的类型(键,值) 在我的情况下,两者都是字符串,如果值是长类型,请使用最后一行作为: --属性值.反序列化器=org.apache.kafka.common.序列化.长序列化器

淳于兴朝
2023-03-14

您的Kafka Streams应用程序可以正常工作。

该错误位于kafka-consolt-消费者kafka.tools.ConsoleConsumer是实现脚本逻辑的类)中。

在反序列化期间,它无法正确处理null。当它获取null作为消息的值或键时,它会设置默认值(表示nullString的字节数组)。如果检查源代码,您可以找到以下函数

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
  val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
  val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
    getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
  output.write(convertedBytes)
}

您如何查看它何时获取null的source ceBytes(source ceBytes==null)以进行反序列化,它为此设置了默认值:

val nonNullBytes=Option(source ceBytes). getOrElse("null". getBytes(StandardCharsets.UTF_8))

在您的示例中,它是“null”.getBytes(StandardCharsets.UTF_8)。然后,尝试使用org.apache.kafka.common.serialization进行反序列化。LongDeserializer(值反序列化器)LongDeserializer在开始时检查字节数组的大小。现在是4(null的字节表示形式),并引发异常。

例如,如果您使用StringDeserializer,它不会正确反序列化它,但至少它不会引发异常,因为它不检查字节数组的长度。

长话短说:ConsoleConsumer的格式化程序,即负责打印,对于漂亮的打印设置了一些默认值,这些默认值无法由某些反序列化程序(长序列化器,整数序列化程序)处理

关于,为什么应用程序为某些键生成null值:

KTable:filterKStream::filter具有不同的语义。根据KTable的javadoc:

对于每个被删除的记录(即不满足给定谓词),将转发一个逻辑删除记录。

对于您的< code >过滤器,当< code >计数

 类似资料:
  • 我有一个小程序可以用阿帕奇Kafka来计算颜色的数量 创建主题,生产者/消费者开始使用终端: 我在终端中提供了以下输入: 我在消费者终端收到错误: 这里有什么问题?我做了简短的研究,发现了其他人提出的类似问题,但这些解决方案似乎对我不起作用。

  • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时

  • 我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?

  • 我正在尝试设置Kafka Connect接收器,以便使用Datastax连接器将主题中的数据收集到Cassandra表中:https://downloads.Datastax.com/#AKC 运行一个直接在代理上运行的独立worker,运行Kafka 0.10.2.2-1: 但我有以下错误: 卡桑德拉或Kafka方面没有额外的错误。我在cassandra节点上看到活动连接,但没有任何东西到达密钥

  • 我正在尝试用kafka绑定构建一个简单的云流应用程序。让我描述一下设置。1、我有一位制作人正在制作主题1 2。有一个流活页夹,经过一些处理后将主题1绑定到主题2。 我是这些技术的新手。无法弄清楚这里出了什么问题。有人能帮忙吗?

  • 问题内容: 我正在导入MySQL转储,并收到以下错误。 显然,数据库中存在附件,这使得插入量非常大。 这是在我的本地计算机上,一台从MySQL软件包安装了MySQL 5的Mac。 我在哪里更改才能导入转储? 还有什么我应该设置的吗? 只是运行会导致相同的错误。 问题答案: 您可能必须为客户端(您正在运行以执行导入)和正在运行并接受导入的守护程序mysqld进行更改。 对于客户端,可以在命令行上指定