使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取。我尝试使用GlobalKTable实现此目的,但问题是数据将被覆盖,也无法对其应用聚合。
假设我有一个名为“
data_in”的主题,具有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“
data_in”的所有分区中读取数据。我的意思是I1可以从P1,P2和P3中读取,I2可以从P1,P2和P3,I2中读取以及不断地读取。
编辑:请记住,生产者可以将两个相似的ID发布到“ data_in”中的两个不同分区中。
因此,当运行两个不同的实例时,GlobalKtable将被覆盖。
拜托,如何实现呢?这是我代码的一部分
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
将输入主题“ data_in”的分区数更改为1个分区,或使用a
GlobalKtable
从主题中所有分区获取数据,然后可以将其与流一起加入。这样一来,您的应用实例将不再需要位于不同的使用者组中。
该代码将如下所示:
private GlobalKTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区。
我对Kafka和Kafka流很陌生,所以请容忍我。我想知道我是否在正确的轨道上。 我正在给一个Kafka主题写信,试图通过rest服务访问数据。在访问原始数据之前,需要对其进行转换。 到目前为止,我拥有的是一个将原始数据写入主题的制作人。 1)现在我想要streams应用程序(应该是一个在容器中运行的jar),它可以将数据转换为我想要的形状。遵循这里的物化视图范式。 1的过度简化版本。) 2)和另
我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?
我在数据流中使用“keyby”。我要flink发现每个键的所有kafka分区。我有30个分区
我有一个Kafka流媒体应用程序,它订阅了许多主题,每个主题都有许多分区。当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给我的应用程序的当前实例?我想知道这个独立于任何记录是否由这个实例处理。 我知道当我得到一条记录时,我可以做和获取正在处理的当前记录的分区/主题信息。但我不是在找那个。 我正在寻找一个等效的在kafka流侧。 我也尝试了以下代码,但我得到大小s为0。
对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。 我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。
如果我是正确的,默认情况下,spark streaming 1.6.1使用单线程从每个Kafka分区读取数据,假设我的Kafka主题分区是50,这意味着每50个分区中的消息将按顺序读取,或者可能以循环方式读取。 案例1: -如果是,那么我如何在分区级别并行化读取操作?创建多个< code > kafkautils . createdirectstream 是唯一的解决方案吗? 案例2: -如果我的