目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。
最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇怪,看起来像是在几天不活动之后(给定的键值对可能几天不变)changelog主题中的聚合值丢失了。
KTable装配线:(消息按事件中的'username'分组)
@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
.groupBy((key, event) -> event.getUsername(),
Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
.aggregate(
UserItems::none,
(key, event, userItems) ->
userItems.after(event),
Materialized
.<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
.withKeySerde(serdes.forA(UsernameVO.class))
.withValueSerde(serdes.forA(UserItems.class)));
}
public class UserItems {
private final Map<String, Item> items;
public static UserItems none() {
return new UserItems();
}
private UserItems() {
this(emptyMap());
}
@JsonCreator
private UserItems(Map<String, Item> userItems) {
this.userItems = userItems;
}
@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
return Collections.unmodifiableMap(items);
}
...
public UserItems after(ItemAddedEvent itemEvent) {
Item item = Item.from(itemEvent);
Map<String, Item> newItems = new HashMap<>(items);
newItems.put(itemEvent.getItemName(), item);
return new UserItems(newItems);
}
应用程序-用户-用户项
这个源话题没有问题。它将保留设置为最大值,所有消息始终存在。
application-user-UserItems-store-changelog(已压缩。具有默认配置-未更改保留,也未更改任何内容)
Offset | Partition | Key | Value
...........................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
...
Offset | Partition | Key | Value
..............................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1054 0 "User1" : {"ItemName2":{"param":"bar"}}
1055 0 "User1" : {}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600
Topic: application-user-UserItems-store-changelog Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: application-user-UserItems-store-changelog Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: application-user-UserItems-store-changelog Partition: 2 Leader: 1 Replicas: 1 Isr:
任何想法或暗示都将不胜感激。干杯
我遇到了与您描述的相同的问题,似乎该问题与您的kafka-streams配置有关。您已经提到您的“源”主题有以下配置:
3个代理,每个主题3个分区,复制因子为2
确保将kafka streams配置(Replication.Factor)中的以下属性至少设置为2(默认设置为1)
StreamsConfig.REPLICATION_FACTOR_CONFIG [replication.factor]
如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同
我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
我有一个用例,我的KTable是这样的。 KTable:orderTable 键:值 KTable:此表将位于groupBy值上,且计数列值将具有和 键:值
我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里
我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st