当前位置: 首页 > 知识库问答 >
问题:

Kafka流:爪哇。lang.IllegalArgumentException:对于VoidDeserializer,数据应该为空

古彦
2023-03-14

我正在研究我的第一个Kafka流样本:

package com.example;

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

class DslExample {

  public static void main(String[] args) {
    // the builder is used to construct the topology
    StreamsBuilder builder = new StreamsBuilder();

    // read from the source topic, "users"
    KStream<Void, String> stream = builder.stream("users");

    // for each record that appears in the source topic,
    // print the value
    stream.foreach(
        (key, value) -> {
          System.out.println("(DSL) Hello, " + value);
        });

    // you can also print using the `print` operator
    // stream.print(Printed.<String, String>toSysOut().withLabel("source"));

    // set the required properties for running Kafka Streams
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.24:29092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // build the topology and start streaming
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();

    // close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}

当我试图运行它时,会出现以下错误:

Caused by: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer.

这是来自“用户”主题的示例消息:

价值:

{
  "registertime": 1517518703752,
  "userid": "User_8",
  "regionid": "Region_7",
  "gender": "OTHER"
}

标题:

[
  {
    "key": "task.generation",
    "stringValue": "0"
  },
  {
    "key": "task.id",
    "stringValue": "0"
  },
  {
    "key": "current.iteration",
    "stringValue": "86144"
  }
]

关键:

User_8

我应该怎么做才能避免这个问题?

共有1个答案

楮乐邦
2023-03-14

如果密钥实际上包含数据,则不应使用Serdes。Void()KStream

 类似资料:
  • 我试图用我的gradle项目运行junit测试,但不管是什么测试,它们都会抛出以下堆栈跟踪 我已经看到了一些建议的解决方案,但似乎没有一个奏效。当我在ant中运行它们时,我的测试运行良好。我已经确保xerces使用的是最新版本,我还添加了建议的版本 对于JVM,但似乎什么都不起作用。 任何建议都将不胜感激。我可以根据要求提供更多信息。 **更新** 做了以下更改- 但现在的结果如图所示 **更新2

  • 阅读了一下Java8,我读到了这篇博文,解释了一些关于流和它们的减少,以及什么时候可以短路减少。在底部,它说: 请注意,在个或我们只需要与谓词匹配的第一个值(尽管值不能保证返回第一个值)。但是,如果流没有排序,那么我们希望的行为类似于。所有、和 操作可能根本不会使流短路,因为可能需要计算所有值来确定运算符是还是。因此,使用这些的无限流可能不会终止。 我知道或可能会使还原短路,因为一旦您找到一个元素

  • 问题内容: 我有这个Java代码段。我是Java的菜鸟。 错误: 码: 问题答案: 是的,这是问题所在: 在课程的最高级别,您只能拥有: 实例初始化程序块() 静态初始值设定块() 变量声明 构造函数声明 方法声明 嵌套类型声明 终结器声明 这些都不是。如果你 的意思 来声明一个变量,你应该这样做: 如果这 不是 您想要的,则应解释您的意图。 编辑:修复此问题后,此编译器错误似乎很明显: Conf

  • 我已经红色了多篇文章和讨论,但我仍然有一些不确定性:我不确定是否应该使用或任何其他类型来存储预订-在“在线预订”的意义上(因此来自不同国家/时区的参与者需要在时间线上的同一时刻会面)。我倾向于使用,因为DB和Backend设置为UTC,并且由于传入的“创建预订”json消息包含ISO 8601(带偏移量)start DateTime 让我们采用以下设置:1.数据库(UTC、Oracle、MSSQL

  • 我的问题是关于cucumber特性文件的并行执行。在Selenium Java中,可以通过一个runner类并行运行多个cucumber特性文件吗? 我尝试过不同的方法,但都没有成功。

  • 我如何解决这个问题?我正在从Excel文件中读取数据,输出为字符串。 DataProvider的方法“getData”需要Object[][]返回类型。 错误:(17,16)Java:不兼容类型:java.lang.String无法转换为java.lang.Object[]