在 confluent kafka rest 代理中,我们可以获取特定消费者组的最后一个提交偏移量,但是我们如何获取主题的最新偏移量来计算滞后。
您可以使用Kafka REST代理获取为特定分区提交的最新偏移量。根据汇合的文件,
GET /consumers/(string: group_name)/instances/(string: instance)/offsets
获取给定分区的上次提交的偏移量(无论提交是通过此过程还是其他进程发生的)。
请注意,必须向保存使用者实例的特定 REST 代理实例发出此请求。
参数:
> < li>
组名(字符串)-使用者组的名称
实例(字符串)-- 使用者实例请求 JSON 的 ID
对象阵列:
响应JSON对象数组:
状态代码:
请求示例:
GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
{
"partitions": [
{
"topic": "test",
"partition": 0
},
{
"topic": "test",
"partition": 1
}
]
}
示例响应:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{"offsets":
[
{
"topic": "test",
"partition": 0,
"offset": 21,
"metadata":""
},
{
"topic": "test",
"partition": 1,
"offset": 31,
"metadata":""
}
]
}
假设我有一个结构并将偏移提取到成员: 给定一个指向 上的算术不需要产生正确的结果(例如,我记得有些CPU具有处理非字节对齐地址的能力,这意味着数组中的每个的 8步递增)。 看起来标准中有一些东西被遗忘了(或者我错过了什么)。
背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va
有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人