我的Kafka集群有下一个配置。
我使用的是Kafka-Consumer-Perf,测试用的是下面使用ConsumerPerformance的decro。
我定期运行下面的命令
./kafka-run-class.sh kafka.admin.consumergroupcommand--bootstrap-server 192.168.10.10:9093--new-consumer--description--group testgroup1--command-config../config/consumer.properties
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
fara 4 1056241 10000000 8943759 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 3 1075308 10000000 8924692 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 1 1056241 10000000 8943759 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 2 2129355 10000000 7870645 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 0 1075308 10000000 8924692 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 4 2302837 10000000 7697163 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 3 2377136 10000000 7622864 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 1 2302837 10000000 7697163 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 2 4641785 10000000 5358215 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 0 2377197 10000000 7622803 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 4 5576341 10000000 4423659 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 3 5738479 10000000 4261521 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 1 5576341 10000000 4423659 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 2 10000000 10000000 0 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 0 5739456 10000000 4260544 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 4 8735653 10000000 1264347 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 3 8825872 10000000 1174128 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 1 8735653 10000000 1264347 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 2 10000000 10000000 0 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 0 8827764 10000000 1172236 consumer-1-f9fd5d69-6ba1-4da1-9eb9-29a64a83d1fa /192.168.10.145 consumer-1
fara 4 10000000 10000000 0 - - -
fara 3 10000000 10000000 0 - - -
fara 1 10000000 10000000 0 - - -
fara 2 10000000 10000000 0 - - -
fara 0 10000000 10000000 0 - - -
所有的消息都被消耗掉了。
$kafka_home/bin/kafka-consumer-perf-test.sh\--new-consumer\--broker-list$broker_list\--消息10000000\--消息大小100\--topic fara\--consumer.config$kafka_home/config/consumer.ssl.properties\--num-fetch-threads1\--show-detailt-stats\--report-interval$report_interval\--组testgroup2
我定期运行下面的命令
./kafka-run-class.sh kafka.admin.consumergroupcommand--bootstrap-server 192.168.10.10:9093--new-consumer--description--group testgroup2--command-config../config/consumer.properties
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
fara 1 4129909 10000000 5870091 consumer-1-2e20de15-9396-4ebb-a77d-c16ef7c0cb03 /192.168.10.145 consumer-1
fara 4 4520065 10000000 5479935 consumer-1-ed2e3f63-23e9-489a-a9bc-422e7830fc9c /192.168.10.139 consumer-1
fara 2 4310725 10000000 5689275 consumer-1-5dbc6ec0-7bc6-4d71-8b17-1b5af261479d /192.168.10.134 consumer-1
fara 3 4038958 10000000 5961042 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 0 4524641 10000000 5475359 consumer-1-2a7186c9-c4a0-41d6-ba5e-809964b7b709 /192.168.10.134 consumer-1
fara 1 6071173 10000000 3928827 consumer-1-2e20de15-9396-4ebb-a77d-c16ef7c0cb03 /192.168.10.145 consumer-1
fara 4 6689713 10000000 3310287 consumer-1-ed2e3f63-23e9-489a-a9bc-422e7830fc9c /192.168.10.139 consumer-1
fara 2 6437488 10000000 3562512 consumer-1-5dbc6ec0-7bc6-4d71-8b17-1b5af261479d /192.168.10.134 consumer-1
fara 3 6194150 10000000 3805850 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 0 6612859 10000000 3387141 consumer-1-2a7186c9-c4a0-41d6-ba5e-809964b7b709 /192.168.10.134 consumer-1
fara 1 7489057 10000000 2510943 consumer-1-2e20de15-9396-4ebb-a77d-c16ef7c0cb03 /192.168.10.145 consumer-1
fara 4 8212273 10000000 1787727 consumer-1-ed2e3f63-23e9-489a-a9bc-422e7830fc9c /192.168.10.139 consumer-1
fara 2 7883485 10000000 2116515 consumer-1-5dbc6ec0-7bc6-4d71-8b17-1b5af261479d /192.168.10.134 consumer-1
fara 3 7457277 10000000 2542723 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 0 8039217 10000000 1960783 consumer-1-2a7186c9-c4a0-41d6-ba5e-809964b7b709 /192.168.10.134 consumer-1
fara 2 8535701 10000000 1464299 consumer-1-5dbc6ec0-7bc6-4d71-8b17-1b5af261479d /192.168.10.134 consumer-1
fara 3 8113576 10000000 1886424 consumer-1-5dbc6ec0-7bc6-4d71-8b17-1b5af261479d /192.168.10.134 consumer-1
fara 0 8580780 10000000 1419220 consumer-1-2a7186c9-c4a0-41d6-ba5e-809964b7b709 /192.168.10.134 consumer-1
fara 1 7926793 10000000 2073207 consumer-1-2a7186c9-c4a0-41d6-ba5e-809964b7b709 /192.168.10.134 consumer-1
fara 4 8897425 10000000 1102575 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 4 9884954 10000000 115046 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 3 8693017 10000000 1306983 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 1 8564365 10000000 1435635 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 2 9197795 10000000 802205 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 0 9217803 10000000 782197 consumer-1-646b2ebd-3a36-4e09-a7d4-658225492a03 /192.168.10.145 consumer-1
fara 4 9942050 10000000 57950 - - -
fara 3 8749827 10000000 1250173 - - -
fara 1 8621461 10000000 1378539 - - -
fara 2 9311260 10000000 688740 - - -
fara 0 9274899 10000000 725101 - - -
我不明白为什么所有的记录都没有被消耗掉。有什么需要帮忙的吗?
在这两种情况下,我已经解决了重新实现1.1.0版本中的ConsumerPerformance.Scala类的问题。
https://github.com/apache/kafka/blob/1.1.0/core/src/main/scala/kafka/tools/consumerperformance.scala#l161
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群