我有一个Kafka流应用程序,有两个数据源:事件和用户。
我有4个主题:事件,用户,用户2,用户事件
Users2与Users相同,用于演示GlobalKTable。
Events主题使用时间戳模式,因此当到达时间戳字段日期时,KStream将接收事件记录。
此时,我想为用户KTable中的每个用户ID以及新的事件ID创建一个用户事件记录;但我不知道如何遍历GlobalKTable或KTable来实现这一点。
public Topology createTopology() {
final Serde<String> serde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, String> gktUsers =
builder.globalTable("Users",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("user-store")
.withKeySerde(Serdes.String()).withValueSerde(serde));
final KTable<String, String> ktUsers = builder.table("Users2");
builder.stream("Events", Consumed.with(Serdes.String(), serde))
.peek((k, v) -> {
// This is called when a new Event record becomes current.
// How do I iterate through gktUsers at this point
// and output a User-ID and an Event-ID to the User-Events topic?
// This type of iteration doesn't work either.
ktUsers.toStream().foreach(new ForeachAction<String, String>() {
@Override
public void apply(String s, String s2) {
log.info("{} {}", s, s2);
}
});
})
builder.build();
}
你需要让状态存储迭代它
ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store("user-store", QueryableStoreTypes.keyValueStore()));
String value = store.get("key");
KeyValueIterator<String, String> range = keyValueStore.all();
https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html
或者,也许您应该在用户和事件之间进行流表连接
我有一个真正的env与3个kafka机器集群,它正在接收大量数据。对于每个主题,有 25 个分区,复制因子设置为 2。 我的应用程序(基于kafka流的应用程序)从这个kafka集群获取数据的应用程序停机了一个多月。现在,每个分区都有大量滞后;高达90000000。 我知道以下参数: 我有两个消费者节点(使用kafka集群数据的相同组id)。 然而,它并没有赶上滞后,而是保持不变。有人能建议如何改
在Java8中,类没有任何方法来包装。 相反,我将从获取,然后从获取一个,如下所示:
我想用kafka流实现请求-响应模式,我使用spring boot kafka,其中添加了一些数据作为报头,命名为关联id,但是当kafka流API处理请求消息时,报头数据会丢失,无法发送到响应主题!我该怎么解决,还是用另一种方法??
我想知道用Kafka Streams执行这种操作的最佳方法是什么。 我有一个 Kafka 流和一个 KGlobal 表,让我们说产品 (1.000.000 消息) 和类别逻辑表 (10 msg)。每当新消息到达主题类别LogicBlobTable时,我需要重新处理所有将新到达的消息应用于产品的产品,并且输出将转到第三个主题。 我在考虑使用Kafka . tools . streams reset逻
我得到了同样的错误。这有可能吗?我真的更希望不执行stream.foreach()并传递一个消费者。 编辑:对我来说,不要复制元素是很重要的。
其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?