当前位置: 首页 > 知识库问答 >
问题:

断开一个代理时测试kafka行为(sping-kafka)

终彬郁
2023-03-14

我从Kafka开始。

我有一个集群,有两个代理(id#2和#3),复制因子为2。如果我断开一个经纪人的连接(id#3),我想测试Kafka的行为。

#3关闭后,我的主题信息:

Topic: CUSTOMER PartitionCount: 5       ReplicationFactor: 2    Configs:
        Topic: CUSTOMER Partition: 0    Leader: 2       Replicas: 3,2   Isr: 2
        Topic: CUSTOMER Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2
        Topic: CUSTOMER Partition: 2    Leader: 2       Replicas: 3,2   Isr: 2
        Topic: CUSTOMER Partition: 3    Leader: 2       Replicas: 2,3   Isr: 2
        Topic: CUSTOMER Partition: 4    Leader: 2       Replicas: 3,2   Isr: 2

每个分区都在每个代理上复制,现在#2代理是领导者,这没关系。

消息的发布是可以的,但我的消费者服务并没有使用它(我使用的是Spring kafka)。

在断开连接时,消费者日志为:

2020-04-01 14:51:42.736  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.737  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.840  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.841  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 2147483644 could not be established. Broker may not be available.
2020-04-01 14:51:42.841  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.842  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.136  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-5, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.184  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-3, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.

然后什么都没有。动物园管理员日志里什么都没有。

当我启动我的代理时,所有的消息现在都被消耗掉了。

如果我错了,你能告诉我吗?在我的主题配置中,我假设断开一个代理应该是可能的,不会产生任何影响。

我的Kafka配置:

broker.id=2 (not the same value on the other broker)
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/share/kafka/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=serv2:2181,serv3:2181,serv5:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor=1
offsets.topic.replication.factor=1

还有我的动物园管理员配置:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/share/zookeeper/data
server.2=serv2:2888:3888;2181
server.3=serv3:2888:3888;2181
server.5=serv5:2888:3888;2181

我用SpringKafka创作了这个主题:

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    @Bean
    public NewTopic notifTopic() {
        return new NewTopic(notifTopic, partitions, (short) bootstrapAddress.split(",").length);
    }

对于消费者:配置:

@EnableKafka
@Configuration
@Profile({ "!mockKafka & !test" })
public class KafkaConfiguration implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;

    @Value(value = "${kafka.servers}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groups.notif.name}")
    private String notifGroup;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(getConsumerFactoryProperties()));
        factory.setConcurrency(5);
        return factory;
    }

    private Map<String, Object> getConsumerFactoryProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, notifGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(validator);
    }

}

听众:

@Service
@Slf4j
@Profile({"!mockKafka & !test"})
@Transactional
@KafkaListener(containerFactory = "containerFactory", topics = { "${kafka.topics.notif.name}" })
public class NotificationListener { 

    @KafkaHandler
    public void email(@Payload @Valid EmailNotification record, @Header(ContextUtils.HEADER_ACCOUNT) String account,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) long partition) {
        log.info("Consuming message [EMAIL] from topic [{}], partition [{}], offset [{}]", topic, partition, offset);
        ...
    }

以及我的全球配置:

kafka:
    servers: serv2:9092,serv3:9092
    publish.timeout: 3000
    partitions: 5
    topics:
        customer:
            name: CUSTOMER
        notif:
            name: NOTIF
        health:
            name: HEALTH
    groups:
        customer:
            name: CUSTOMER
        notif:
            name: NOTIF

使用版本:Kafka:2.4.0动物园管理员:3.5.6SpringKafka:2.2.12

谢谢

共有1个答案

越嘉石
2023-03-14

我觉得这是

offsets.topic.replication.factor=1

由于偏移量没有被复制,消费者无法找到它的位置。

根据单据,默认为3(如果省略);但它被覆盖为1(至少在我的自制区)。

 类似资料:
  • 我正在尝试设置Kafka Connect,目的是运行Elasticsearch chSinkConntor。 Kafka安装程序,由3个使用Kerberos、SSL和ACL保护的代理程序组成。 到目前为止,我一直在尝试使用docker/docker-com的连接器运行连接框架和elasticserch-server本地化(使用Kafka 2.4连接到远程kafka安装(Kafka 2.0.1-实际

  • 我有两个Kafka代理:server1:9092和server2:9092我正在使用Java客户端向这个集群发送消息,代码如下: 当其中一个代理关闭时,Test在某些情况下会抛出此异常(在此异常示例中'server1'已关闭): 2015-11-02 17:59:29138警告[org.apache.kafka.common.network.Selector]服务器1/40.35.250.227

  • null 当MQTT代理变得不可用时,Paho MQTT客户机不能帮助我保证这些QoS2级别的消息将被重新传递,这是正确的说法吗? 因此,我如何区分以下情况,即Client.Publish导致了一个MqttException,其中Paho没有将消息持久化。 下面是它在飞行中坚持的地方 null 连接丢失(32109):PAHO保存消息 客户端当前正在断开连接(32102):PAHO丢失消息 等待服

  • 由于本机KafkaConsumer不是线程安全的,因此不鼓励从不同的线程而不是kafka使用者处理线程调用pause和resume方法。但正如spring kafka提供的另一层kafka信息容器,内部使用kafka consumer。所以我的问题是,我们可以使用KafkaListenerEndpointRegistry通过id获取侦听器容器,并从其他线程而不是消费者处理线程调用resume或pa

  • 目标: 我想在非常简单的场景中测试所有Nginx代理超时参数。我的第一种方法是创建非常简单的HTTP服务器,并设置一些超时: 在听和接受之间测试proxy_connect_timeout 在接受和读取之间测试proxy_send_timeout 在读取和发送之间测试proxy_read_timeout 测试: 1)服务器代码(python): 2)Nginx配置: 通过显式设置proxy\u co

  • 我通过Intellij Idea通过以下链接调试maven测试:http://www.grygoriy.com/2012/01/how-to-debug-tests-maven-test-via.html 当到达第三步并开始调试时,它已连接但很快断开连接,并且不会在断点处停止。我在Intellij有: 已连接到目标VM,地址:'localhost:5005',传输:'socket' 已断开与目标V