我对Spring的Kafka还是有点陌生。我的问题很简单。我有一个仅限消费者使用的应用程序,它可以连续读取Kafka,处理消息,并使用Ack侦听器手动确认消息。我有一个上游生产者专用应用程序的依赖项,在该应用程序中,他们负责向Kafka主题发送消息,以便我使用。我们最近在生产者和消费者之间实现了事务,但我想了解更多关于故障点的信息,以及如何处理那些回滚的事务,以便它们不会丢失?我已经读到,最好使用AfterRollbackProcessor
而不是SeekTocurInterrorHandler
来处理Kafka容器工厂上的事务,同时将StatefulRetry
设置为true。我之所以使用事务,是为了在较新的版本中实现完全相同的Kafka语义,因为我们需要处理大量数据库持久性,并且由于数据库限制,无法承受重复的事务。我想知道我的@KafkaListener
是否必须用@Transactional
注释,因为我之前读过一篇帖子,说不应该是这样,但其他帖子说可能是这样,这就是我不确定的原因。我看到了许多关于生产者和消费者应用程序的问题,但我没有看到关于分别扮演这些角色的独立应用程序的问题(即使最后可能是同一件事)。简而言之,我只是想知道在与Kafka合并交易时,什么是最佳实践,以及在这种情况下如何处理失败。
Kafka事务对于仅限消费者的应用程序来说是不必要的开销。事务只有在生成记录时才有用。
我使用事务是为了在他们的新版本中实现精确一次的Kafka语义学,因为我们处理大量的数据库持久性,并且由于数据库限制而无法承受重复的事务。
当涉及其他技术时,不能保证“仅一次”。一次只适用于
read->process->write
读写是Kafka的场景。这是一个常见的误解。
此外,即使仅使用kafka读取/处理/写入,“恰好一次”语义学也仅适用于整个系统。也就是说,读取的偏移量仅在写入成功时提交。
进程
步骤将至少获得一次语义学,因此无论是否有Kafka写入步骤和(如果有Kafka写入),只要您在进程步骤的其他地方编写,就需要重复删除逻辑。
对于从Kafka读取数据并写入数据库而不写入Kafka的情况,侦听器上的@Transactional
是正确的方法(使用重复数据消除逻辑以避免重复)。
对于您希望只进行一次Kafka语义学(读取/处理/写入)但同时在处理步骤中写入DB的情况,您可以在侦听器容器中使用ChainedKafkaTransactionManager
,以便DB事务与Kafka事务同步(但对于DB提交成功但Kafka事务失败的情况,仍然有一个小窗口)。因此,即使如此,您仍然需要de-dup逻辑。在这种情况下,您不希望侦听器上有@Transactional
。
编辑
制片人只是有点不同;假设你想在一个事务中发布10条记录,你想让它们全部进入(提交)或退出(回滚)。那么你必须使用事务。
事务中产生的记录的使用者应该有isolation.level=read_committed
,这样他们就看不到未提交的写入(默认为read_uncommitted
)。
如果一次只发布一条记录,并且不涉及其他事务资源,那么如果只涉及Kafka,那么使用事务就没有什么意义。
但是,如果您正在从DB或JMS等读取并写入Kafka,您可能希望同步DB和Kafka事务,但重复的可能性仍然不是零;您如何处理这取决于您提交事务的顺序。
通常,重复数据消除取决于应用程序;通常会使用应用程序数据中的某个键,因此,例如,SQL INSERT语句以数据库中不存在的键为条件。
Kafka为每条记录提供了一个方便的唯一键,并结合了主题/分区/偏移量。您可以将它们与数据一起存储在数据库中以防止重复。
编辑2
SeekToMONtErrorHandler
(STCEH)通常在不使用事务时使用;当侦听器抛出异常时,错误处理程序会重置偏移量,以便在下一次轮询时重新绘制记录。经过多次尝试后,我们放弃并调用“恢复器”,例如DeadLetterPublishingRecoverer
将失败的记录写入另一个主题。
但是,它仍然可以用于事务。
错误处理程序在事务范围内(回滚之前)被调用,因此,如果它抛出异常(除非恢复程序“消耗”了故障,否则它会抛出异常),事务仍将回滚。如果恢复成功,事务将提交。
在将恢复功能添加到STCEH之前,添加了后回滚处理器(ARP)。它本质上与STCEH完全相同,但它在事务范围之外运行(在回滚之后)。
如果STCEH已经执行了搜索,那么配置这两个选项都不会有任何影响,因为ARP将无需执行任何操作。
我仍然更喜欢使用带有事务的ARP和不带事务的STCEH,哪怕只是为了获得日志消息的适当日志类别。可能还有其他我现在想不起来的原因。
请注意,现在STCEH和ARP都支持重试和退避,根本不需要配置侦听器级别的有状态重试。如果您想使用内存中的重试而不会导致往返代理以重新获取相同的记录,无状态重试可能仍然很有用。
但是,即使我将节点显式设置到属性中,它们也不会尝试重新连接到有效的kafka节点。 如何使我的使用者在他们连接的kafka节点失败后重新连接到有效的kafka节点?
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我试图找到一种在以下场景中使用ThreadPoolExecutor的方法: 我有一个单独的线程在线程池中生成和提交任务 为了提供更多的上下文,我目前只需一次提交所有任务,并取消ExecutorService返回的所有未来。在最长生成时间到期后提交。我忽略所有产生的取消异常,因为它们是预期的。问题是未来的行为。cancel(false)很奇怪,不适合我的用例: 它可以防止任何未启动的任务运行(良好)
问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要