当前位置: 首页 > 知识库问答 >
问题:

“WindowedBy Count KStream”抛出KStream异常

叶谦
2023-03-14

我尝试将KStream中的事件计入时间段:

    KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));

    KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());

    KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));

    streamValueKey.groupByKey()
                  .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
                  .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));

我有这个例外:

线程“test-app-87ce 164d-c427-4d cf-aa76-aeeb 6 f 8 fc 943-stream thread-1”org . Apache . Kafka . streams . errors . streams Exception中出现异常:在进程中捕获到异常。taskId=0_0,processor = KSTREAM-SOURCE-000000000,topic=vehicle,partition=0,offset=160385位于org . Apache . Kafka . streams . processor . internals . stream task . process(stream task . Java:318)位于org . Apache . Kafka . streams . processor . internals . assignedstreamstasks . process(assignedstreamstasks . Java:94)位于org . Apache . Kafka . streams . processor . internals . task manager . process(task manager更改StreamConfig中的默认Serdes,或者通过方法参数提供正确的Serdes。

共有1个答案

宦兴朝
2023-03-14

groupByKey()使用默认序列化器:

组按键()

将记录按其当前键分组到KGrouedStream中,同时保留原始值以及默认序列化器和反序列化器。

您必须使用组ByKey(序列化

以下内容应该可以解决问题:

streamValueKey.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
              .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));

 类似资料:
  • 抛出异常的行为是否可能抛出不同的异常? 为了抛出异常,必须(可选地)分配新对象,并调用其构造函数(隐式调用fillinstacktrace)。在某些情况下,听起来像addSupressed也被称为。那么如果没有足够的内存会发生什么呢?JVM是否需要预分配内置异常?例如,(1/0)会抛出OutOfMemoryError而不是ArithmeticException吗? 此外,构造函数是一个方法调用,因

  • 问题内容: 考虑以下代码: 无需添加方法签名即可编译该代码。(它与同样表现到位,太)。 我理解为什么 可以 安全地运行它,因为实际上不能将其引发在块中,因此不能引发已检查的异常。我有兴趣知道在何处指定此行为。 并非永远都不会达到目标:以下代码也会编译: 但是,如果抛出一个检查的异常,它不会像我期望的那样编译: 在JLS Sec 11.2.2中 ,它说: 一,其抛出的表达式语句(§14.18)具有静

  • 问题内容: 我试图在Netbeans中重构一个大型程序,但我有点迷茫。我从来没有非常模块化,但是现在通过实际学习如何做到这一点来尝试纠正这种情况,并在将来纠正这种情况。不幸的是,我在将某些教程翻译成我的程序时遇到了麻烦。所以我希望这里有人可以帮忙。目前,我正在尝试分解一部分采用特定格式的文件并制成表格的代码。我知道我需要创建一个类并使用它来创建表对象,但是我不确定如何做。我有一个主文件,用于获取文

  • 问题内容: 我目前正在使用play2框架。 我有几个正在抛出的类,但是play2s全局处理程序使用throwable而不是异常。 例如我的一门课是抛出一个。我是否可以检查可抛物体(如果是)? 问题答案: 您可以使用它来检查它是否存在。 例: 假设是参考。

  • throw 关键字表示发生了异常,称为抛出异常。throw 通常指定一个操作数(我们将介绍不指定操作数的特殊情况)。throw 的操作数可以是任何类型,如果操作数是个对象,则称为异常对象。也可以抛出条件表达式而不是抛出对象,可以抛出不用于错误处理的对象。 抛出异常时,指定相应类型的最近一个异常处理器(对抛出该异常的try块)捕获这个异常。try块的异常处理紧接在try块后面。 抛出异常时,生成和初

  • 我们的应用程序在streams代码中间歇性地遇到无序异常。这会导致流线程停止。 实现很简单,2个KStreams连接并输出到另一个主题。 在寻找这个无序顺序异常的解决方案时,我在Confluent上找到了以下文档 https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling 但找不到这里提到的设置、配置或