当前位置: 首页 > 知识库问答 >
问题:

如何访问合流kafka rest代理中主题的最新偏移量以计算延迟

苗学民
2023-03-14

在 confluent kafka rest 代理中,我们可以获取特定消费者组的最后一个提交偏移量,但是我们如何获取主题的最新偏移量来计算滞后。

共有1个答案

温星华
2023-03-14

您可以使用Kafka REST代理获取为特定分区提交的最新偏移量。根据汇合的文件,

GET /consumers/(string: group_name)/instances/(string: instance)/offsets

获取给定分区的上次提交的偏移量(无论提交是通过此过程还是其他进程发生的)。

请注意,必须向保存使用者实例的特定 REST 代理实例发出此请求。

参数:

> < li>

组名(字符串)-使用者组的名称

实例(字符串)-- 使用者实例请求 JSON 的 ID

对象阵列:

  • partitions—查找
  • 最后提交的偏移量的分区列表
  • 分区[i]。topic(string)--主题的名称
  • 分区[i]。partition(int)--分区ID

响应JSON对象数组:

  • 偏移量 -- 已提交的偏移量列表
  • offsets[i].topic (string) -- 为其提交偏移量的主题的名称
  • offsets[i].partition (int) -- 为其提交偏移量的分区 ID
  • 偏移量[i].offset (int) -- 已提交的偏移量
  • 偏移量[i].元数据(字符串)-- 已提交偏移量的元数据

状态代码:

  • 404 未找到 --
  • 错误代码 40402 -- 找不到分区
  • 错误代码 40403 -- 找不到使用者实例

请求示例:

GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

示例响应:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
 [
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
 ]
}
 类似资料:
  • 假设我有一个结构并将偏移提取到成员: 给定一个指向 上的算术不需要产生正确的结果(例如,我记得有些CPU具有处理非字节对齐地址的能力,这意味着数组中的每个的 8步递增)。 看起来标准中有一些东西被遗忘了(或者我错过了什么)。

  • 背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va

  • 有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终

  • 现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人