我的Kafka Streams应用程序正在使用以下键值布局的kafka主题:String.class-
打印当前主题时,可以确认:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 -- SUCCESS #C:\Daten\file-service\in\crypto.p12
“flow1”是
字符串
键,--
后面的部分是序列化值。
我的流程设置如下:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
因此,据我所知,我告诉它使用
String
和历史事件
序列来消费主题,因为这就是主题中的内容。然后,我'rekey'它使用一个组合的关键字,应该存储在本地使用提供的Serde为历史事件ey.class
。据我所知,这将导致一个额外的主题被创建(可以在Kafka容器中的主题列表中看到)与新的关键字。这很好。
现在的问题是,即使在主题中只有一个文档的干净环境中,应用程序也无法启动:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
从信息中很难判断问题的确切位置。它在我的基本主题中指出,但这是不可能的,因为那里的键不是类型
HistoryEventKey
。由于我在reduce
中为HistoryEventKey
提供了serde,因此它也不能与本地存储一起使用。
对我来说唯一有意义的是,它与导致重新排列和新主题的
selectKey
操作有关。但是,我无法确定如何为该操作提供serde。我不想将其设置为默认值,因为它不是默认的密钥serde。
我遇到了一条非常类似的错误消息,但我没有groupbys,而是joins。我在这里为谷歌搜索的下一个人发帖。
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
很明显,和原来的问题一样,我不想更改默认的serdes。
所以在我的例子中,解决方案是在连接中传递一个连接实例,这将允许在SERDE中传递。请注意,错误消息指向重新分区映射-
这有点像转移视线,因为修复程序会转移到其他地方。
我如何修复它(一个加入的例子)
//...omitted ...
KStream<String,MySession> mySessions = myStream
.map((k,v) ->{
MySession s = new MySession(v);
k = s.makeKey();
return new KeyValue<>(k, s);
});
// ^ the mapping causes the repartition, you can not however specify a serde in there.
// but in the join right below, we can pass a JOINED instance and fix it.
return enrichedSessions
.leftJoin(
myTable,
(session, info) -> {
session.infos = info;
return session; },
Joined.as("my_enriched_session")
.keySerde(Serdes.String())
.valueSerde(MySessionSerde())
);
在对执行进行更多调试后,我能够发现新主题是在group pByKey
步骤中创建的。您可以提供一个分组实例,它可以指定用于键和值的Serde:
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
.withKeySerde(new HistoryEventKeySerde())
.withValueSerde(new HistoryEventSerde())
)
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
我正在使用Kafka Producer和RoundRobin分区器来处理一个有12个分区的主题。 代码可在此处找到https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java 我面临的问题是,这个分区程序让分区正确
当我们基于某个键在流上应用组 by 函数时,kafka 如何计算这一点,因为相同的键可能存在于不同的分区中?我看到了()函数,它基本上对数据进行了重新分区,但我不明白它是什么意思。它会将具有相同键的所有消息移动到单个分区中吗?另外,我们可以通过()方法调用的频率如何?如果有要求,我们可以在收到每条消息后调用它吗?请建议。谢谢
我创建了一个包含3个分区的主题 我使用Java制作人同步写入主题 我有一个Java的用户订阅并阅读它 我的键总是一组固定的3个不同的字符串(k1、k2、k3)。但是我的消息总是去分区1或分区2——k1和k2去分区1,k3去分区2。 为什么分区0未使用?
根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同
spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。
我发现分区“Tracking-3”上的消息没有被消耗!! 问题每次都会重现,在新分配的分区中有一些消息丢失,你能有什么建议吗?请帮帮我,谢谢