在使用Kafka Streams的处理器API时,我使用了如下内容:
context.forward(key,value)
context.commit()
实际上,我在这里做的是每分钟从状态存储发送一个状态到下沉(在init()方法中使用context.schedule())。我不明白的是:
我向前发送的[Key,Value]对,然后执行commit(),从状态存储中获取。它是根据我的特定逻辑从许多非连续输入[键,值]对中聚合的。每个这样的输出[键,值]对都是来自输入的几个非有序[键,值]对的聚合(Kafka主题)。因此,我不理解Kafka cluster和Kafka Streams lib如何知道原始输入[键,值]对和正在发送的最终输出[键,值]之间的相关性。如果Kafka不知道输入对和输出对之间的连接,它如何被事务包装(故障安全)。当我做上下文时,实际上是在做什么。提交()?
谢谢
详细解释这一切超出了我在这里可以写的答案。
基本上,如果提交了事务,则当前输入主题偏移量和对Kafka主题的所有写入都是原子化的。这意味着,所有挂起的写操作都会在提交完成之前刷新。
事务不需要了解您的实际业务逻辑。它们只是将输入主题上的进度跟踪与写入输出主题“同步”。
我建议阅读相应的博客帖子并观看一次关于Kafka的演讲,以了解更多细节:
顺便说一句:这是一个关于Streams API中手动提交的问题。您应该考虑以下问题:如何使用Kafka流手动提交?
本文向大家介绍详解javascript中的事件处理,包括了详解javascript中的事件处理的使用技巧和注意事项,需要的朋友参考一下 一.事件传播机制 客户端JavaScript程序(就是浏览器啦)采用了异步事件驱动编程模型。当文档、浏览器、元素或与之相关的对象发生某些有趣的事情时,Web浏览器就会产生事件(event)。如果JavaScript应用程序关注特定类型的事件,那么它可以注册当这
本文向大家介绍C#中事务处理和非事务处理方法实例分析,包括了C#中事务处理和非事务处理方法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#中事务处理和非事务处理方法。分享给大家供大家参考。具体如下: C#代码如下: StringUtil.cs如下: DbUtils.cs如下: 希望本文所述对大家的C#程序设计有所帮助。
lab1中对中断的处理实现 (1) 外设基本初始化设置 Lab1实现了中断初始化和对键盘、串口、时钟外设进行中断处理。串口的初始化函数serial_init(位于/kern/driver/console.c)中涉及中断初始化工作的很简单: ...... // 使能串口1接收字符后产生中断 outb(COM1 + COM_IER, COM_IER_RDI); ...... // 通过中断控制
本文向大家介绍php实现mysql事务处理的方法,包括了php实现mysql事务处理的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了php实现mysql事务处理的方法。分享给大家供大家参考。具体分析如下: 要实现本功能的条件是环境 mysql 5.2 /php 5 支持事务的table 类型,需要InnoDB,有了这些条件你就可以做上面的实现了,这个事物回滚操作是大项目经常用到的,像
3.3 实现调试事件处理 为了让我们的调试器能够针对特定的事件采取相应的行动,我们必须给所有调试器能够 捕捉到的调试事件,编写处理函数。回去看看 WaitForDebugEvent() 函数,每当它捕捉到一 个调试事件的时候,就返回一个填充好了的 DEBUG_EVENT 结构。之前我们都忽略掉这个 结构,直接让进程继续执行下去,现在我们要用存储在结构里的信息决定如何处理调试事件。 DEBUG_EV
本文向大家介绍Mysql事务处理详解,包括了Mysql事务处理详解的使用技巧和注意事项,需要的朋友参考一下 一、Mysql事务概念 MySQL 事务主要用于处理操作量大,复杂度高的数据。由一步或几步数据库操作序列组成逻辑执行单元,这系列操作要么全部执行,要么全部放弃执行。在 MySQL 中只有使用了 Innodb 数据库引擎的数据库或表才支持事务。事务用来管理 insert,update,del