我正在使用Docker启动一个kafka代理集群(例如,5个代理,每个容器一个代理)。Kafka版本2.12-0.11.0.0,动物园管理员3.4.10。
场景:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0
bootstrap.servers=localhost:9092
compression.type=none
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
>
在独立模式下启动Zookeeper,然后启动kafka
创建主题
/opt/kafka/bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor1--partitions1--topic my-test-topic1
/opt/kafka/bin/kafka-console-consumer.sh--bootstrap-server localhost:9092--from-bartin--topic my-test-topic1--max-messages1
消息被累犯
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888
server.properties(broker.id唯一,broker_ip:broker_port对于ech broker不同)
broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0
producer.properties在每个代理上从第1到第5
bootstrap.servers=localhost:9092
compression.type=none
Consumer.properties在第1到第5个代理上
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
/opt/kafka/bin/kafka-topics.sh--描述--Zookeeper本地主机:2181--主题my-test-topic1主题:my-test-topic1分区计数:1复制因子:1配置:主题:my-test-topic1分区:0领导:5复制:5 ISR:5
是正常行为吗?还是应该留在经纪人1上?
你试过把售票时间提高到6000点吗?基于Hadoop的设置,他们默认使用这个,声明2000毫秒的设置太低了。我想这同样适用于这里。我现在正在研究一个非常类似Kafka的问题。
我使用Zookeeper和Kafka作为使用Java的消息传递用例。我以为当您重新启动Zookeeper和Kafka服务器时,消费者组详细信息会被删除。但他们没有。zookeeper会将消费者组详细信息保存在某种文件中吗? 如果我想重置消费者组,我应该手动删除消费者组详细信息吗? 任何人都可以向我澄清这一点吗?
本文向大家介绍ZooKeeper 集群 ?相关面试题,主要包含被问及ZooKeeper 集群 ?时的应答技巧和注意事项,需要的朋友参考一下 为了保证高可用,最好是以集群形态来部署 ZooKeeper,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么 ZooKeeper 本身仍然是可用的。通常 3 台服务器就可以构成一个 ZooKeeper 集群了。ZooKeeper 官方提供的架构
所以我和我的Kafka消费者之间有了一些恼人的矛盾。我使用“Kafka节点”为我的项目。我创造了一个话题。在一个使用者组中通过2台服务器创建了2个使用者。自动提交设置为false。对于我的消费者获得的每一个mesaage,他们会启动一个异步进程,该进程可能需要1~20秒,当进程完成时,消费者会提交偏移量。我的问题是:在一个senarios中,消费者1得到一个消息,需要20秒来处理。在过程中间,他得
Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?
我将sping-boot(2.1.6.RELEASE)与sping-kafka(2.2.7.RELEASE)一起使用,并且我使用KafkaTemplate向我的kafka集群发送消息。但是有时(通常是当我重新启动kafka代理或进行重新平衡时),我在发送消息时会看到这样的错误: 由于默认的Kafka生产者配置,我期望发送失败重试,但他们没有。默认Kafka生成器配置: 我的配置是这样的: 我发出这
我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?