当前位置: 首页 > 知识库问答 >
问题:

如何在面向请求的应用程序中管理Kafka事务性生产者对象

满元凯
2023-03-14

当配置为事务生产者时,在面向请求(例如http或RPC服务器)的应用程序中管理Kafka生产者对象的最佳实践是什么?具体来说,如何在服务线程之间共享生产者对象,以及如何定义事务。这些对象的id配置值?

在非事务性使用中,生产者对象是线程安全的,并且在所有请求服务线程之间共享一个对象是很常见的。设置kafka消费者线程使用的事务性生产者对象也很简单,只需为每个消费者线程实例化一个对象即可工作得很好。

将事务生产者与面向请求的应用程序相结合似乎更为复杂,因为服务线程的生命周期通常由线程池动态控制。我可以想出几个选择,但都有缺点:

  1. 共享单个对象,通过某种互斥来防止并发。负载下的争用可能是一个严重的问题
  2. 为每个传入的请求实例化生产者对象。KafkaProducer对象初始化缓慢,因为它们维护网络连接、线程和其他重量级对象;为每个请求支付此费用似乎不切实际
  3. 维护生产者对象池,并为每个请求租用一个。我能看到的主要缺点是所需的机器数量。还不清楚如何配置事务。对于这些对象,因为它们的生命周期没有像文档中所说的那样清晰地映射到分区、有状态的应用程序中的碎片标识符

还有其他选择吗?有最优方法吗?

共有1个答案

詹正浩
2023-03-14

事务性id用于防止由读取-处理-写入模式中的僵尸进程引起的重复,您从该模式中读取并生成kafka主题。对于面向请求的应用程序,例如由传入的超文本传输协议请求生成的消息,事务性id不会带来任何好处(当然,如果您想使用事务,您仍然需要分配一个,并且不应该在相同进程或集群中不同进程的生产者之间重复)

正如文档所说,事务生产者不是线程安全的

正如示例中所暗示的,每个生产者只能有一个未结交易。在beginTransaction()和commitTransaction()调用之间发送的所有消息都将是单个事务的一部分

因此,正如您正确解释的那样,不能并发访问生产者,因此我们必须从您描述的三个选项中选择一个。

对于这个答案,我将假设面向请求的应用程序对应于超文本传输协议请求,因为该机制正在触发与事务一起生成的消息(实际上,不止一条消息,否则对于幂等生产者来说就足够了,不需要事务)

就正确性而言,所有这些都是可以的,因为选项1可以工作,但根据您的应用程序吞吐量,它可能有很高的争用,选项2也可以工作,但您将付出更高延迟的代价,并且效率不高。恕我直言,我认为选项3可能是最好的,因为它是前两个选项之间的折衷方案,尽管当然需要更仔细的实施,而不是每次都打开一个新的生产者。

剩下的问题是如何将事务id分配给生产者,特别是在最后一种情况下(尽管选项1和3都有相同的问题,因为在这两种情况下,我们都在重用具有相同事务id的生产者来处理不同的请求)。

要回答这个问题,我们首先需要了解事务处理的目标。id是为了保护我们免受僵尸进程(一个挂起一段时间的进程,例如长gc暂停的bc,被视为已死亡,但一段时间后返回并继续)导致的重复消息的影响,这被称为僵尸Geofence。

理解僵尸Geofence需求的一个重要细节是理解它可能发生在哪个用例中,这是一种读取-处理-写入模式,在该模式中,您从主题中读取、处理元素并写入输出主题和偏移主题,这为我们提供了原子性和精确一次语义(如果您没有对流程步骤产生任何副作用)。幂等生产者防止了由于生产者重试(其中消息由代理持久化,但生产者未接收到ack)和kafka内的两阶段提交(其中我们不仅写入输出,而且还将消息标记为通过也生成到偏移主题来消费)而导致的重复防止了由于消费消息而导致的重复多次(如果进程在生成输出主题后但在提交偏移量之前崩溃)。还有一种微妙的情况是,可以引入副本,它是一个僵尸生产者,它通过每次生产者调用initTransactions时单调增加一个历元来进行隔离,该事务将与生产者发送的每条消息一起发送。因此,对于要隔离的制作人,另一个制作人应该使用相同的事务id启动,这里的关键由Jason Gustafson在本次演讲中解释

“我们正在寻找的是保证每个输入分区只有一个写入负责读取该数据并写入输出”

这意味着交易。id是根据“读-处理-写”模式中正在使用的分区分配的。因此,如果分配了主题a的分区0的流程被认为是死的,则重新平衡将开始,分配的新流程应创建具有相同事务的生产者。id,这就是为什么它应该是这样的

回到您最初的问题,http请求不会遵循僵尸可以引入重复项的读取过程写入模式,因为每个http请求都是唯一的,即使您引入了唯一标识符,从事务生产者的角度来看,它也将是不同的消息。在这种情况下,我认为,如果您希望向两个不同主题写入数据的原子性,那么使用事务生产者仍然有价值,但您可以为选项2选择一个随机事务id,或者将其重新用于选项1和3。

我的答案已经过时了,因为它基于Kafka的旧版本。KIP-447解决了前面描述的每个分区有一个生产者的开销问题

随着输入分区数量的增加,这种架构的扩展性不好。每个生产者都有单独的内存缓冲区、单独的线程和单独的网络连接。这限制了生产者的性能,因为我们无法有效地使用多个任务的输出来改进批处理。由于存在更多并发事务和更多冗余元数据管理,这也会给代理带来不必要的负载

这是本文中解释的主要区别

在使用者组重新平衡后完成分区分配时,使用者的第一步是始终获取下一个偏移量以开始获取数据。通过这一观察,OffsetFetch协议的保护得到了增强,这样当使用者组具有与一个分区相关联的挂起事务偏移时,OffsetFetch调用可以被阻止,直到相关事务完成。以前,将返回“过时”的偏移数据,并允许应用程序立即继续。

有了这个新功能,我不再清楚transactional.id的使用。

尽管目前还不清楚为什么如果有待处理的交易,Geofence需要阻止投票,而在我看来,发送消费者组元数据就足够了(我假设僵尸生产者将通过promise旧generation.id来Geofencegroup.id,generation.id随着每次重新平衡而受到冲击),但transactional.id似乎不再扮演重要角色

模式V1中,如果另一个实例具有相同的事务性,则生产者被“隔离”。id已启动。Spring通过为每个组使用一个制作人来管理这一点。id/主题/分区;当发生重新平衡时,新实例将使用相同的事务。id和老制片人被隔离<对于模式V2,每个组不需要一个生产者。id/topic/partition,因为消费者元数据与偏移量一起发送到事务,代理可以使用该信息确定生产者是否被隔离。

 类似资料:
  • 我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522

  • 我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息

  • 我正在使用分离的存储库(后端)进行MERN堆栈项目 但是,当我切换到生产环境并将代理值替换为部署的链接时,“代理”不再受支持。我做了一个关于它的搜索,我发现它只是为了开发环境,我尝试了在互联网上找到的几个解决方案,但没有运气! 顺便说一下,我用Heroku部署后端,用Netlify部署前端。现在,它们都部署好了,没有任何错误,但是后端和前端之间没有连接。

  • 假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?

  • 在我们的SpringBoot项目(使用SpringMVC、SpringData等)中,我们使用HibernateEnvers来审核数据库记录。有几件事我不确定。 1-性能-线程: 假设我有一个被审计的个人实体。当我在相关表中插入/更新新的个人记录时,这会如何影响我的应用程序的性能?客户端是否必须等待所有envers审核完成?Envers会在一个单独的线程中处理这个问题吗?一旦插入成功,那么客户就可

  • 我对Spring的Kafka还是有点陌生。我的问题很简单。我有一个仅限消费者使用的应用程序,它可以连续读取Kafka,处理消息,并使用Ack侦听器手动确认消息。我有一个上游生产者专用应用程序的依赖项,在该应用程序中,他们负责向Kafka主题发送消息,以便我使用。我们最近在生产者和消费者之间实现了事务,但我想了解更多关于故障点的信息,以及如何处理那些回滚的事务,以便它们不会丢失?我已经读到,最好使用