当前位置: 首页 > 知识库问答 >
问题:

在Kafka Streams中使用至少一次交付时,流处理是原子的还是事务性的?

段阳夏
2023-03-14

让我们假设这样一个简单的例子:

ORDER_TOPIC ----> KSTREAM ----> VALIDATED_ORDER_TOPIC
                     |
          ROCKSDB LOCAL STATE STORE

KStream使用带有转换器的转换操作来消除ORDER_主题中的重复消息,该转换器通过密钥/id将消息存储在持久本地状态存储中。这样,如果相同的顺序到达两次,它将被忽略。

现在一个新订单到达,它不是重复的,所以它存储在本地存储中,但在将其发送到VALIDATED_ORDER_TOPIC应用程序崩溃之前。

我想知道KStream中的事务保证是什么:记录是否已存储并提交到本地状态存储或回滚?

你能指出一些关于Kafka流的事务性保证的文档吗?至少有一次语义?

共有1个答案

吴丁雷
2023-03-14

如果您至少使用一次语义运行,那么就没有事务保证。在这种情况下,如果首先将ID添加到存储中,但在将记录写入输出主题之前崩溃,则在从输入主题重新处理该记录时,可能会丢失该记录。

如果要去重复,需要启用processing.guarantees=exactly_once。对于这种情况,如果您崩溃,存储将“回滚”到一致状态。即,崩溃后,只有在写入输出主题成功时,它才会包含ID。

 类似资料:
  • 在MongoDB中,写操作的原子性是在document级别上的,即使修改的是文档中的内嵌部分,写锁的级别也是document上。 当一个写操作要修改多个文档,每个文档的修改是原子性的。整个的写操作并不是原子性的,它可能和其他写操作产生交织。然而你可以使用$isolated隔离操作符来限制写操作,让它不与其他写操作交织。 不隔离性能更高,但是会产生数据的不确定性,隔离写操作,事务性更好。MongoD

  • 因此,我通过JDBC连接到GridGain集群,并且我只在由SQL语句创建的缓存上通过JDBC连接使用INSERT/UPDATE/DELETE语句。我的缓存是事务性的 我的JDBC用法是这样的。在这里,我禁用自动提交并运行一些语句,然后手动提交。 我对集群中的两个数据节点执行了一些测试,没有发现问题。因此,根据定义,JDBC连接必须使用悲观锁,就像键值API一样。我的假设正确吗?我的JDBC使用是

  • Vue.js新手,尝试在表示表单的组件中处理提交事件,该表单将检查子组件的有效性,并在一切正常的情况下将处理传递给父组件中的事件处理程序。 我得到这个错误。。。[Vue warn]:v-on处理程序中的错误:“TypeError:undefined不是在中找到的对象(计算“$event.preventDefault”)”--- 子组件是MyForm。。。 父组件(应用程序)。。。 或者,将应用程序

  • 对终端操作的任何调用都会关闭流,使其无法使用。这个‘特性’带走了很多权力。 我想这不是技术上的原因。这个奇怪的限制背后的设计考虑是什么? 编辑:为了演示我所讲的内容,请考虑以下C#中快速排序的实现:

  • 问题内容: 参考以下链接:http : //docs.python.org/faq/library.html#what- kinds-of-global-value-mutation-are-thread- safe 我想知道以下情况: 在cPython中将保证是原子的。(x和y都是python变量) 问题答案: 让我们来看看: 它不会出现,他们是原子:x的和y的值可以被另一个线程之间改变字节码,

  • 下面是从kafka主题(8分区)接收消息并对其进行处理的消费者代码。 如果处理逻辑中没有返回错误,则所有工作都按预期进行。 但是,如果我抛出一个错误来模拟特定消息的处理逻辑中的异常行为,那么我就没有处理导致异常的消息。流将移动到下一条消息。 我想要实现的是,处理当前消息并提交偏移量,如果它成功,然后移动到下一个记录。 问候, 维诺特