我有一个处理器,它从主题中获取json字符串,类型为GenericRecord。现在我把这条河分成两条支流。我采用第一个分支,并将(key,value)映射为2个字符串,其中包含一个特定的json字段和该字段的值,然后按key分组。到目前为止,一切都很好。现在,我必须用用户定义的新类型聚合流,并收到一个异常。
这里是代码:
新类型:
private class Tuple {
public int occ;
public int sum;
public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}
public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}
public int getAverage () {
return this.sum / this.occ;
}
public String toString() {
return occ + "-> " + sum + ": " + getAverage();
}
好流:
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);
KStream<GenericRecord, GenericRecord>[] branches = source.branch(
(key,value) -> partition(value.toString()),
(key, value) -> true
);
KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
.groupByKey();
问题是:
KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple (0,0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
这是例外:
Exception in thread "streamtest-6173d6a2-4a3a-4d76-b793-774719f8b1f5-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000011, topic=streamtest-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual value type (value type: com.mycompany.maventest.Streamer$Tuple). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 6 more
Caused by: java.lang.ClassCastException: com.mycompany.maventest.Streamer$Tuple cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:191)
... 19 more
我如何解决这个问题?
更新 ------
生产商用Avro生产,所以我有这样的配置属性:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
如果我指定自定义serde,结果如下:
KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple(0, 0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.with(Serdes.String(), new MySerde()));
例外情况:
Exception in thread "streamtest-17deb5c8-ed07-4fcf-bd59-37b75e44b83f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
---已解决----我还为groupBy中的类型更改添加了新的serde
KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
.groupByKey( Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String())); /* value */
除非在操作中明确指定,否则Kafka流将使用默认Serde。
在aggregate()方法中,您将valueType定义为Tuple
,而默认的serde是GenericRecord
的,因此会引发异常。您需要按如下方式指定serde:
KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple (0,0), // initializer
(aggKey, newValue, aggValue) ->
new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue))
,Materialized.with(keySerde, tupleSerde));
它将使用元组Serde进行此操作。您可以在此处找到示例:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating
问题内容: 昨天我从假期回来工作,在我们的日常工作中,我的队友提到他们正在重构我们Java代码中的所有模型对象,以删除所有的getter和setter并使模型字段成为所有公共对象,从而调用了Demeter之所以这样做是因为 为了促进我们遵守Demeter定律:一个模块不应该知道其操纵的“物体”的内在特性。由于数据结构不包含任何行为,因此它们自然会公开其内部结构。因此,在这种情况下,Demeter不
4.4.5 数据类型的自定义 在有了一些数据类型后,程序员还可定义这些数据类型的别名或指针类型。表达这种定义的伪指令是TYPEDEF,其定义形式如下: 新数据类型名 TYPEDEF [位距] [PTR] 数据类型 其中:“位距”是NEAR、FAR或PROC等。 例如: CHAR TYPEDEF BYTE ;给BYTE定义另一个别名CHAR PCHAR TYPEDEF PTR CHAR ;定义一个字
对于Cassandra中的用户定义聚合函数,什么可以作为INITCOND?我只见过具有简单类型(例如元组)的示例。 我为聚合函数中的状态对象提供了以下类型: 当我省略INITCOND时,我得到一个JavaNullPointerException。
问题内容: 我对SQLITE相当陌生,我注意到只有4种数据类型,但是我在网上看到的例子中人们在放置自己的数据类型。我对此不太了解,想知道是否有人可以向我解释。例如,我看到一列将保存日期,并且给定的数据类型是不存在的时间戳。默认是什么?自己制作时默认为文本吗? 问题答案: 使用动态类型系统。只有五个存储类:NULL,整数,实数,文本和blob。(来源:SQLite版本3中的数据类型。) 并且,引用该
我正在为一个项目试验Apache Flink。我正在使用 Flink 来聚合一系列传感器捕获的环境数据。为了计算空气质量指数,我正在尝试实现一个自定义聚合函数,以便在带有窗口的分组选择中使用,但我对类型提示有问题。下面是带有 DataTypeHint 注释的函数代码: 但我有以下例外: 我做错了什么?
问题内容: 我正在创建一个numpy数组,该数组将填充我制作的特定类的对象。我想初始化数组,使其仅包含该类的对象。例如,这是我想做的事,如果我做这件事会发生什么。 我可以做这个: 然后将的每个元素分配为一个对象(或任何其他类型的对象)。从编程的角度(类型检查)和数学的角度(对函数集进行操作)的角度来看,如果我能够拥有一个s数组,那将是如此的巧妙。 我可以使用任意类指定numpy数组的数据类型吗?