当前位置: 首页 > 知识库问答 >
问题:

Kafka Rest代理消费者创建

冯星剑
2023-03-14

假设我有一个服务,它通过kafka-rest-proxy来消费消息,并且总是在同一个消费者组上。我们还可以说,它正在消耗一个有一个分区的主题。当服务启动时,它在kafka-rest-proxy中创建一个新的使用者,并使用生成的使用者url,直到服务关闭。当服务重新启动时,它将在kafka-rest-proxy中创建一个新的消费者,并使用新的url(和新的消费者)进行消费。

>

  • 因为kafka每个分区最多只能有一个使用者。在kafka和kafka-rest-proxy中会发生什么,当消费者被重新启动时?即在kafka-rest-proxy中创建了一个新的消费者,但旧的消费者没有机会被破坏。所以现在在kafka-rest-proxy中重新启动我的服务后,有'n'个消费者,但其中只有一个正在被积极消费。我甚至能够在我的新消费者上消费消息吗?因为消费者比分区多?

    让我们把这个变得更复杂,说我在同一个消费者组上有我的服务的5个实例,在主题中有5个分区。在'n'重新启动我的服务的所有5个实例后,我甚至会被保证消费所有的消息,而不确保正确地销毁现有的消费者。即,当消费者对分区进行编号时,在消费者创建过程中,Kafka和kafka-rest-proxy做了什么?

    什么被认为是Kafka-Rest-代理的最佳实践,以确保陈腐的消费者总是被清理?您建议保留使用者URL吗?我是否应该强制kafka-rest-proxy重新启动,以确保在启动我的服务之前已销毁现有的消费者?

    *编辑*我相信我的问题有一部分是用这个配置回答的,但不是全部。

    consumer.instance.timeout.ms-使用者实例自动销毁之前的空闲时间。类型:int默认值:300000重要性:低

  • 共有1个答案

    红弘盛
    2023-03-14

    >

  • 如果您不能完全关闭使用者,它将在向其发出上次请求后的一段时间内保持活动状态。在这种情况下,代理将垃圾收集过时的使用者--如果它没有完全关闭,使用者将无限期地保留某些分区。通过自动垃圾收集使用者,您不需要一些单独的持久存储来跟踪您的使用者实例。正如您所发现的,您可以通过configconsumer.instance.timeout.ms来控制这个超时。

    由于实例将被垃圾收集,因此可以保证您最终使用所有消息。但在超时期间,某些分区可能仍被分配给旧的使用者集,您将不会在这些分区上取得任何进展。

    理想情况下,不干净的关闭你的应用程序是很少的,所以最好的做法是清理消费者时,你的应用程序是关闭。即使在例外情况下,也可以使用try/catch/finallyfinally块来破坏使用者。如果一个还活着,它最终会恢复的。除此之外,如果应用程序能够容忍consumer.instance.timeout.ms设置,请考虑将其调整为更低。它只需要大于使用使用者的调用之间的最长周期(并且您应该记住可能的错误情况,例如,如果处理消息需要与另一个系统交互,而该系统可能变得缓慢/不可访问,那么您应该在设置此配置时考虑到这一点)。

    您可以持久化URL,但即使这样也会有丢失使用者的风险,因为您不能原子式地创建使用者并将其URL保存到其他持久存储中。此外,由于完全不受控制的故障在您没有机会进行清理的情况下不应该是常见的情况,所以这样做通常不会给您带来太多好处。如果您需要从故障中快速恢复,那么应用程序的使用者实例超时可能会大大减少。

    RE:强制重新启动代理,这将是相当少见的,因为REST代理通常是一个共享服务,这样做会影响所有其他正在使用它的应用程序。

  •  类似资料:
    • 启动使用者接收消息 根据我的理解,consumer直接使用来自broker的消息,但在上面的consumer命令中,我们没有提到broker,而只提到zookeeper。消费者是否会连接到zookeeper(而不是broker)来消费消息?

    • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

    • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

    • 当所有的经纪人都起来的时候,一切都是好的。但是,如果我先杀死(按开始顺序),代理消息会被发送到代理,但使用者不能接收到任何消息,消息不会丢失。启动该代理后,使用者立即接收消息。 关闭broker实例后的使用者日志: 再次启动丢失的代理后的使用者日志: 谢谢

    • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

    • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者