当前位置: 首页 > 面试题库 >

Kafka流:从应用程序的每个实例的所有分区中读取

杜良骏
2023-03-14
问题内容

使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取。我尝试使用GlobalKTable实现此目的,但问题是数据将被覆盖,也无法对其应用聚合。

假设我有一个名为“
data_in”的主题,具有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“
data_in”的所有分区中读取数据。我的意思是I1可以从P1,P2和P3中读取,I2可以从P1,P2和P3,I2中读取以及不断地读取。

编辑:请记住,生产者可以将两个相似的ID发布到“ data_in”中的两个不同分区中。
因此,当运行两个不同的实例时,GlobalKtable将被覆盖。

拜托,如何实现呢?这是我代码的一部分

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

问题答案:

将输入主题“ data_in”的分区数更改为1个分区,或使用a
GlobalKtable从主题中所有分区获取数据,然后可以将其与流一起加入。这样一来,您的应用实例将不再需要位于不同的使用者组中。

该代码将如下所示:

private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      else
        aggregate.getList().add(value);
      return aggregate;
  }, materialized)
  .to("agg_data_in");

  return getBuilder().globalTable("agg_data_in");
}

编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区。



 类似资料:
  • 我对Kafka和Kafka流很陌生,所以请容忍我。我想知道我是否在正确的轨道上。 我正在给一个Kafka主题写信,试图通过rest服务访问数据。在访问原始数据之前,需要对其进行转换。 到目前为止,我拥有的是一个将原始数据写入主题的制作人。 1)现在我想要streams应用程序(应该是一个在容器中运行的jar),它可以将数据转换为我想要的形状。遵循这里的物化视图范式。 1的过度简化版本。) 2)和另

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我在数据流中使用“keyby”。我要flink发现每个键的所有kafka分区。我有30个分区

  • 我有一个Kafka流媒体应用程序,它订阅了许多主题,每个主题都有许多分区。当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给我的应用程序的当前实例?我想知道这个独立于任何记录是否由这个实例处理。 我知道当我得到一条记录时,我可以做和获取正在处理的当前记录的分区/主题信息。但我不是在找那个。 我正在寻找一个等效的在kafka流侧。 我也尝试了以下代码,但我得到大小s为0。

  • 对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。 我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。

  • 如果我是正确的,默认情况下,spark streaming 1.6.1使用单线程从每个Kafka分区读取数据,假设我的Kafka主题分区是50,这意味着每50个分区中的消息将按顺序读取,或者可能以循环方式读取。 案例1: -如果是,那么我如何在分区级别并行化读取操作?创建多个< code > kafkautils . createdirectstream 是唯一的解决方案吗? 案例2: -如果我的