我试图编写一个kafka streams应用程序,它进行一些转换,并将数据放入物化键值存储,然后从存储中读取它(我需要查询压缩数据)。一件重要的事情是,我不想启动streams应用程序并公开restendpoint,而是希望:启动应用程序->处理从主题到数据存储的所有数据->查询存储->退出应用程序(所有都以顺序方式)
。
编辑:
我已经有多个Kafka主题的数据(并用于不同的场景),我需要每天两次获得所有键的最新值。我可以用典型的交互式查询场景来实现:保持kafka streams应用程序连续运行,以保持商店更新,每天只查询两次--这很有效。但对于我的需求(相对较小的流量)来说,在有限的时间内运行应用程序(只是为了更新存储,同时更新主题,运行查询并退出)会便宜得多。正如Hamed所指出的,流是没有结束的,所以我想要实现的是,在查询存储之前,确保存储已经用所有事件更新到指定的时间戳(或指定的偏移量)。
实际上,Apache Kafka是一个开源的分布式事件流平台。
因此,您必须对应用程序进行建模,以处理流中的事件。溪流没有尽头。如果你不能这样建模和思考,你就用错了工具。
...除了这不存在。但这是关于我在这里想做什么。 或相反的: ...它获取每个GameCharacter的最早版本。为此,我已经尝试了,但显然Javers没有从最后开始计算版本。 这方面的Gradle依赖关系是:
我试图实现Kafka流,这将把单一主题流作为全局数据库与互动查询可能。所以我想拥有: > 记录的全局存储区(GlobalKTable、KeyValueStore)
我们正在使用Apache Phoenix访问HBase数据存储。作为某些需求的一部分,我们需要记录从任何Phoenix客户端发出的每个更新操作,例如写和删除表命令。Phoenix的日志记录是否已经以可解析的格式捕获了这些命令?如果没有,我如何捕捉这些信息?
null 谢谢你的澄清。
我在一个场景中工作,重复的消息可能会到达消费者(KStream应用程序)。为了使用典型的情况,让我们假设它是一个OrderCreatedEvent,KStream有一个处理订单的逻辑。该事件有一个订单id,可以帮助我识别重复的消息。 我想做的是: 1)将每个订单添加到持久状态存储中 2)当处理KStream中的消息时,查询状态存储以检查消息是否已经被接收,在这种情况下不做任何事情。 在位中,我想查
这里已经提出并回答了类似的问题。解决方案是将日志记录级别从组织:: 我的情况的不同之处在于,我使用的是被动支持,上面的坏男孩不起作用。我还尝试将 中的所有内容都设置为 DEBUG,但仍然无法在日志中看到任何查询。 我想反应式存储库有一些特别的地方,我没有提到。任何想法都非常欢迎!