当前位置: 首页 > 知识库问答 >
问题:

Kafka流-如何扩展Kafka存储生成的changelog主题

鄂曦之
2023-03-14

我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。

为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。

在查看生成的主题时,每个消费应用程序创建了3个额外的主题:

    null
    null

下面是创建存储区的代码

public class ProgramMappingEventStoreFactory {
  private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
  private final static String STORE_NAME = "program-mapping-store";
  private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";

  public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
                                                                                               String avroRegistryUrl,
                                                                                               String topic,
                                                                                               String storeDirectory)
  {
    Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
                                                     .withSchemaRegistryUrl(avroRegistryUrl)
                                                     .withApplicationId(createApplicationId(APPLICATION_NAME))
                                                     .withGroupId(UUID.randomUUID().toString())
                                                     .withClientId(UUID.randomUUID().toString())
                                                     .withDefaultKeySerdeClass(SpecificAvroSerde.class)
                                                     .withDefaultValueSerdeClass(SpecificAvroSerde.class)
                                                     .withStoreDirectory(storeDirectory)
                                                     .build();

    StreamsBuilder streamBuilder = new StreamsBuilder();
    bootstrapStore(streamBuilder, topic);
    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
    streams.start();
    try {
      return getStoreAndBlockUntilQueryable(STORE_NAME,
                                            QueryableStoreTypes.keyValueStore(),
                                            streams);
    } catch (InterruptedException e) {
      throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
    }
  }

  private static <T> T getStoreAndBlockUntilQueryable(String storeName,
                                                      QueryableStoreType<T> queryableStoreType,
                                                      KafkaStreams streams)
    throws InterruptedException
  {
    while (true) {
      try {
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        Thread.sleep(100);
      }
    }
  }

  private static void bootstrapStore(StreamsBuilder builder, String topic) {
    KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);

    table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
                                                        (newValue, aggValue) -> null,
                                                        Materialized.as(STORE_NAME));

  }

  private static String createApplicationId(String applicationName) {
    try {
      return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
    } catch (UnknownHostException e) {
      logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
      return String.format("%s-%s", applicationName, UUID.randomUUID());
    }
  }

}

共有1个答案

淳于坚壁
2023-03-14

如果要将相同的状态加载到多个实例中,应该在所有实例上使用globalktable和唯一的application.id(builder.globaltable())。

如果使用ktable,则会对数据进行分区,迫使您对每个实例使用不同的application.id。这可以被认为是一种反模式。

我也不确定为什么要groupby((k,v)->keyvalue.pair(k,v)).reduce()--这会导致不必要的重新分区主题。

 类似资料:
  • 我有一个kafka流应用程序,它在其中使用stateStore(由RocksDB支持)。 stream thread所做的只是从kafka主题获取数据并将数据放入State Store。(还有其他线程从statestore读取数据并进行业务逻辑处理)。 我观察到它创造了一个新的Kafka主题“变化日志”,因为Statestore。 但我没有明白“变化”Kafka话题有什么用处? 为什么需要它(更改

  • 我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人

  • 我需要获取存储中的行数,存储在低级处理器API中维护。我看到,方法“近似数字条目()”可以在此存储中提供键值映射的近似计数。你能澄清一下准确度的%吗,这意味着如果商店里有100行,我们会得到95行作为近似计数吗?或者它有时会低于50行吗?只是想了解影响计数准确性的因素。 注意:假设流应用程序使用单个主题并在单个实例上运行。存储是通过低级处理器API访问的,不确定默认情况下是否应用了任何缓存。提交频

  • 我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。 这里是平均值的函数: 以下是例外情况: 问题是基本目录不存在,但我希望kafka流在必要时创建目录。 编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。 1个处理器的配置文件: 2个处理器的配置文件: 现在我启动处理器: 类型元组包含配置文