当前位置: 首页 > 知识库问答 >
问题:

zookeeper重启后Kafka集群丢失消息

鲜于凯歌
2023-03-14

我正在使用Docker启动一个kafka代理集群(例如,5个代理,每个容器一个代理)。Kafka版本2.12-0.11.0.0,动物园管理员3.4.10。

场景:

    null
tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0
bootstrap.servers=localhost:9092
compression.type=none
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group

>

  • 在独立模式下启动Zookeeper,然后启动kafka

    创建主题

    /opt/kafka/bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor1--partitions1--topic my-test-topic1

      null
    • 检查邮件

    /opt/kafka/bin/kafka-console-consumer.sh--bootstrap-server localhost:9092--from-bartin--topic my-test-topic1--max-messages1

    消息被累犯

      null
      null
    tickTime=2000
    initLimit=10
    syncLimit=5
    
    dataDir=/opt/zookeeper/data
    
    clientPort=2181
    maxClientCnxns=10
    minSessionTimeout=4000
    maxSessionTimeout=1000000
    server.1=0.0.0.0:2888:3888
    server.2=broker2_IP:broker2_2888:broker2_3888
    server.3=broker3_IP:broker3_2888:broker3_3888
    server.4=broker4_IP:broker4_2888:broker4_3888
    server.5=broker5_IP:broker5_2888:broker5_3888
    

    server.properties(broker.id唯一,broker_ip:broker_port对于ech broker不同)

    broker.id=N
    listeners=PLAINTEXT://:9092
    advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=127.0.0.1:2181
    zookeeper.session.timeout.ms=6000
    zookeeper.connection.timeout.ms=1000000
    group.initial.rebalance.delay.ms=0
    

    producer.properties在每个代理上从第1到第5

    bootstrap.servers=localhost:9092
    compression.type=none
    

    Consumer.properties在第1到第5个代理上

    zookeeper.connect=127.0.0.1:2181
    zookeeper.session.timeout.ms=6000
    zookeeper.connection.timeout.ms=1000000
    group.id=test-consumer-group
    

    /opt/kafka/bin/kafka-topics.sh--描述--Zookeeper本地主机:2181--主题my-test-topic1主题:my-test-topic1分区计数:1复制因子:1配置:主题:my-test-topic1分区:0领导:5复制:5 ISR:5

    是正常行为吗?还是应该留在经纪人1上?

    • 检查每个代理上的消息
  • 共有1个答案

    容修贤
    2023-03-14

    你试过把售票时间提高到6000点吗?基于Hadoop的设置,他们默认使用这个,声明2000毫秒的设置太低了。我想这同样适用于这里。我现在正在研究一个非常类似Kafka的问题。

     类似资料:
    • 我使用Zookeeper和Kafka作为使用Java的消息传递用例。我以为当您重新启动Zookeeper和Kafka服务器时,消费者组详细信息会被删除。但他们没有。zookeeper会将消费者组详细信息保存在某种文件中吗? 如果我想重置消费者组,我应该手动删除消费者组详细信息吗? 任何人都可以向我澄清这一点吗?

    • 所以我和我的Kafka消费者之间有了一些恼人的矛盾。我使用“Kafka节点”为我的项目。我创造了一个话题。在一个使用者组中通过2台服务器创建了2个使用者。自动提交设置为false。对于我的消费者获得的每一个mesaage,他们会启动一个异步进程,该进程可能需要1~20秒,当进程完成时,消费者会提交偏移量。我的问题是:在一个senarios中,消费者1得到一个消息,需要20秒来处理。在过程中间,他得

    • 本文向大家介绍ZooKeeper 集群 ?相关面试题,主要包含被问及ZooKeeper 集群 ?时的应答技巧和注意事项,需要的朋友参考一下 为了保证高可用,最好是以集群形态来部署 ZooKeeper,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么 ZooKeeper 本身仍然是可用的。通常 3 台服务器就可以构成一个 ZooKeeper 集群了。ZooKeeper 官方提供的架构

    • Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?

    • 我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?

    • 我将sping-boot(2.1.6.RELEASE)与sping-kafka(2.2.7.RELEASE)一起使用,并且我使用KafkaTemplate向我的kafka集群发送消息。但是有时(通常是当我重新启动kafka代理或进行重新平衡时),我在发送消息时会看到这样的错误: 由于默认的Kafka生产者配置,我期望发送失败重试,但他们没有。默认Kafka生成器配置: 我的配置是这样的: 我发出这