我正在使用Kafka 0.10.2和zookeeper 3.4.9,我正在Kafka代理上存储我的偏移量。我在一个有5个分区的主题上运行了3个消费者,因此为了检查延迟和所有我运行的命令
。/Kafka-consumer-groups . sh-bootstrap-server localhost:9092-describe-group group 1
所以第一次的结果是
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic1 1 669 670 1 consumer-1-9417e05b-1cf9-4f0f-b6cd-61effdd09456 /127.0.0.1 consumer-1
topic1 4 616 617 1 consumer-3-9ba9b12e-d6c2-423f-93cd-313906a2559c /127.0.0.1 consumer-3
topic1 2 628 706 78 consumer-2-22263f3a-bb8a-4305-a1c8-fbae9f190dda /127.0.0.1 consumer-2
topic1 3 695 697 2 consumer-2-6abb5b02-3ee8-4fd2-ade3-e10e3ce7d06f /127.0.0.1 consumer-2
topic1 0 11433 39381 27948 consumer-1-30b79487-4de8-40a4-951c-02f25e8976fc /127.0.0.1 consumer-1
- - - - - consumer-5-034e5889-dcc4-4cd1-975b-0d40a88899cf /127.0.0.1 consumer-5
- - - - - consumer-3-c10d695e-e067-428f-b31a-3a5318d60ef3 /127.0.0.1 consumer-3
- - - - - consumer-4-090258c4-4d76-4cdd-8b62-f8fccf3ec097 /127.0.0.1 consumer-4
运行1分钟后,相同的命令显示此结果
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic1 1 345 678 333 consumer-1-9417e05b-1cf9-4f0f-b6cd-61effdd09456 /127.0.0.1 consumer-1
topic1 4 620 622 2 consumer-3-9ba9b12e-d6c2-423f-93cd-313906a2559c /127.0.0.1 consumer-3
topic1 2 708 708 0 consumer-2-22263f3a-bb8a-4305-a1c8-fbae9f190dda /127.0.0.1 consumer-2
topic1 3 545 701 156 consumer-2-6abb5b02-3ee8-4fd2-ade3-e10e3ce7d06f /127.0.0.1 consumer-2
topic1 0 11433 39385 27952 consumer-1-30b79487-4de8-40a4-951c-02f25e8976fc /127.0.0.1 consumer-1
- - - - - consumer-5-034e5889-dcc4-4cd1-975b-0d40a88899cf /127.0.0.1 consumer-5
- - - - - consumer-3-c10d695e-e067-428f-b31a-3a5318d60ef3 /127.0.0.1 consumer-3
- - - - - consumer-4-090258c4-4d76-4cdd-8b62-f8fccf3ec097 /127.0.0.1 consumer-4
这怎么可能,之前的当前偏移,滞后和总偏移(分区1)大于后者one.Please让我知道,如果我错过了什么
>
如果消费者使用seek(),
它可以向后移动,因此,电流偏移
可以变得更小。
不确定“总偏移量”是什么意思?如果引用日志端偏移量
,则当新数据写入主题分区时,此偏移量会增加(因此,它独立于实际的使用者组)。
对于滞后
:这只是对数端偏移和
当前偏移
之间的差异。因此,由于电流偏移量
比以前小,对数端偏移量
更大,因此它也更大。
我运行这个命令: kafka使用者组--引导服务器localhost:9092--组我的使用者组--重置偏移量--最早--执行--主题my-topic-1 它给出了错误: 命令的语法不正确。 根据此命令的帮助结果,我键入的内容似乎是正确的。 我在这里犯了什么错误?
我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/
本文向大家介绍JavaScript 描述符和命名属性,包括了JavaScript 描述符和命名属性的使用技巧和注意事项,需要的朋友参考一下 示例 属性是对象的成员。每个命名属性都是一对(名称,描述符)。该名称是允许访问的字符串(使用点符号object.propertyName或方括号符号object['propertyName'])。描述符是对字段的记录,这些字段定义了访问属性时的行为(属性发生了
TableSpec describeTable(String tableName) 功能 描述表,得到表的结构和元数据信息 方法参数 1.tableName : String : required 方法返回值 tableSpec : TableSpec 包括TableSchema和TableMeta两部分 TableSchema 1.entityGroupSpec, 表的实体组键 2.primar
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
在一个消费者群体中的所有消费者都失败后,kafka会将该消费者群体的补偿存储多长时间?是否有此配置变量?