当前位置: 首页 > 知识库问答 >
问题:

SpringKafka和Kafka集群

梁丘安晏
2023-03-14

我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。

但是在我杀死kafka领导者后,我无法发送其他消息到队列。

我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。

Kafka0.10版

有人知道如何正确配置?

编辑

我测试过一个东西,发生了一个奇怪的行为。当我启动服务时,我向主题发送一条消息(强制创建)

代码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

所以,在这一次我没有启动kafka-1服务器(只是其他的),它发生了异常:

org.springframework.kafka.core.KafkaProducerException:发送失败;嵌套异常org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败。

似乎spring-kafka只是尝试连接第一个引导服务器。我使用的是spring kafka 1.3.5.RELEASE和kafka 0.10.1.1

编辑2

我已经做了你做的测试。当我删除第一个docker容器(kafka-1)时,也会发生同样的情况,因为领导者已经改变了。所以,我的消费者(Spring服务)无法使用这些消息。但是当我再次启动kafkaListenerContainerFactory时,该服务会收到所有消息。

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}

共有1个答案

谷梁建中
2023-03-14

服务器地址之间需要逗号,而不是分号。

编辑

我刚刚运行了一个没有问题的测试:

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

@SpringBootApplication
public class So50804678Application {

    public static void main(String[] args) {
        SpringApplication.run(So50804678Application.class, args);
    }

    @KafkaListener(id = "foo", topics = "so50804678")
    public void in(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50804678", 1, (short) 3);
    }

}

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

杀了首领,然后

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2

$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

发送了一条消息,并被应用程序接收;日志中没有错误,除了警告:

[消费者clientId=消费者-1, groupId=foo]无法建立到节点0的连接。代理可能不可用。

然后我重新启动了死机的服务器;停止了我的应用程序;然后添加了此代码……

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        while(true) {
            System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
            Thread.sleep(3_000);
        }
    };
}

同样,杀害现任领导人没有任何影响;一切恢复正常。

您可能需要调整服务器属性中的侦听器/播发.侦听器属性。由于我的经纪人都在本地主机上,所以我让他们默认。

 类似资料:
  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 使用Kafka作为微服务体系结构中的消息传递系统,我想知道哪一个是首选,是Spring Kafka,还是Spring Integration Kafka,为什么?还有,我们根据什么因素来决定选择哪一个?

  • 我试图在Spring Integration中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到Kafka队列中。为此,我使用了spring集成kafka。问题是我得到了一个无法解读的错误。 以下是我的XML配置: 当我通过Spring启动运行我的应用程序时,我得到这个异常: 创建名称为org.springframework.integration.kafka.outbound.Kafk

  • 假设我有 3 台 Kafka 服务器。服务器 1 zoopkeeper1 服务器 2 zoopkeeper2 服务器 3 zoopkeeper3 在集群配置中,zoopkeepers 会发生什么?它们是为每个服务器单独维护的,还是会在群集配置中同步其数据?

  • 我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/