当前位置: 首页 > 知识库问答 >
问题:

在一个实例发生故障转移时,驻留在kafka流实例中的本地状态存储未同步

王建华
2023-03-14

我在本地机器中运行多个kafka流消费者实例(2个实例),每个实例都有自己的自定义本地存储,每个实例的名称不同。

根据文档,如果其中一个实例发生故障,则kafka必须将死实例的存储同步到活实例的存储(如果我错了,请更正我)。

我用相同的应用程序id配置了两个实例,让kafka知道这些实例属于同一个组。

当其中一个实例被杀死时,另一个(活动)实例的存储未与死实例的存储同步。我在两个商店都启用了更改日志主题。

然而,当我在两个实例中都有相同的商店名称时,商店会按预期同步,不确定这些实例是否指向一个商店。我有不同的StreamsConfig。这两个实例的STATE\u DIR\u CONFIG位置。

请让我知道,如果我遗漏了什么,存储名称可以在不同的应用程序实例上不同吗?kafka是否会自动在新实例存储上重播更改日志主题?

//下面是我的流配置

@Bean
public KafkaStreams kafkaStreams(KafkaProperties properties,
                                 @Value("${spring.application.name}") String appName) {
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "client2");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    //props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams1");
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
    props.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
            new RoundRobinAssignor().getClass().getName());
    props.put("auto.offset.reset", "earliest");
    final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
    System.out.println("Invoked kafkaStreams");
    //kafkaStreams.cleanUp();
    kafkaStreams.start();
    return kafkaStreams;
}

共有1个答案

常宸
2023-03-14

我在本地机器中运行多个kafka流消费者实例(2个实例),每个实例都有自己的自定义本地存储,每个实例的名称不同。

这听起来不正确。如果您使用相同的application.id(即group.id)运行多个实例,则所有实例都必须执行相同的代码。(我想知道为什么您的应用程序一开始就没有崩溃。)

我不能100%确定你想实现什么。如果你能分享你的拓扑代码,这可能会有所帮助?

请注意,KafkaStreams基于进审量主题分区(参见https://docs.confluent.io/current/streams/architecture.html)对逻辑存储进行分片。也许您混淆了分片和逻辑存储?

如果您想拥有两个逻辑存储,每个存储有一个分片,您仍然可以运行多个实例,并且存储将在不同的实例上执行(故障转移也可以工作)。但是,您仍然需要在启动时在两个实例上“包含”两个存储。

 类似资料:
  • 我有5台不同的机器,每个机器都有使用kafka-streams应用程序的缩放的5个Spring Boot实例。我正在使用50个分区压缩主题与不同的2-3个主题,我的每个实例有10个并发。我正在使用docker swarm和docker Volume。使用这些主题KTable或KStream对我的kafka streams应用程序执行一些flatMap、map和join操作。 如果一切正常,在我的应

  • 我们希望侦听特定的Kafka主题,并构建它的“历史”--所以对于指定的键,提取一些数据,将其添加到该键的现有列表中(如果不存在,则创建一个新的列表),并将其放到另一个主题中,该主题只有一个分区,并且高度压缩。另一个应用程序可以只听这个主题并更新它的历史列表。 我在想它如何适合Kafka流库。我们当然可以使用聚合: 它创建一个由Kafka支持的本地存储,并将其用作历史表。 问题是,如果我只是为每个正

  • 我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化: 按照设计,正在具体化的信息也由changelog主题支持。 用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存

  • 我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。 例如: Instance1获取:分区-0,分区-1 Instance2获取:partition-2,stateStore-repartition-0 Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2

  • 我已经设置了ActiveMQ多个实例,以便在窗口中的主从模式下实现故障转移。在设置相同的内容时,我刚刚在bin文件夹下创建了3个实例,而没有更改任何端口,并逐个启动了所有3个实例。第一个实例成为主实例,其余实例处于从属模式,直到我停止主实例。 现在我正试图在Linux环境中实现同样的目标。第一个实例成功启动,但当我在另一个窗口中启动第二个实例时,它会抛出以下错误: 错误|无法启动Apache Ac

  • 我正在使用 kafka模板向 kafka 主题发送消息。我遇到了一个要求,如果将消息发送到 kafka 主题时出现故障,那么我应该重试在具有相同偏移量的同一分区上发送消息。请帮助如何使用Kafka模板实现这一点?