我有一点困惑,我想澄清一下。我有东西在做。我想有一个Kafka Streams拓扑,将有五个独立的KStreams从他们各自的主题读取数据,并将这些数据转储到一个大的整体主题。接下来,我将有一个GlobalKTable,它将从这个整体主题中读取并实现一个全局存储,比方说叫做LookupStore
。我想把这个实体化的全局存储作为其他Kafka Streams应用程序的“查找”表。我已经阅读了一些关于使用带有application.server
配置的RPC层公开这一点的内容,该配置将以某种唯一的host:port
的形式出现。
现在,我希望有许多单独的微服务,每个都是Kafka Streams应用程序,它们将执行处理来自KStream的事件,然后通过交互式查询在LookupStore
上进行查找。例如,.filter()
操作基于对该lookupstore
的查找是否返回值。所以我的困惑是...让我们假设我在host:port
上硬编码公开的RPC层,我如何查询lookupstore
来查询它。如果这是在相同的拓扑/本地实例中,您只需执行类似lookupstore.get(“key”)
······但是如何在远程Kafka Streams实例中执行此操作呢?
还是连接到那个RPC层将状态存储暴露给远程应用程序,以便它“知道”它,您可以像查询本地实例一样查询lookupstore
?这是可行的还是我走错了路?
如果您的微服务(即流应用程序)与主流应用程序(生成GlobalKTable)共享相同的Kafka集群,那么它们可以访问对应于相同应用程序的Table主题,并执行KTable join或LookupStore.get(“key”)。另外,由于延迟,不建议在流应用程序中执行远程API调用来进行查找。如果这两个Kafak集群不同,那么您可以使用类似mirror Maker的东西来复制主题(GlobalKTable和State Store change log主题)。
我正在使用java servlet编写一个简单的服务器。为了从数据库中获取数据,我编写了一个特殊的Dao层类,我想知道我应该在哪里存储SQL查询? 我需要创建一个特殊的类,将查询存储为最终字符串,还是有更有效的方法来实现?
在Kafka Streams Spring Boot应用程序中,配置(提供主机和端口信息)和访问交互式查询的“惯用”方式是什么? 访问KafkaStreams实例以访问状态存储的正确方法是什么? 我知道spring cloud stream中的InteractiveQueriesService,但我并不是只在spring Boot中使用spring kafka库。 谢谢你
我试图实现Kafka流,这将把单一主题流作为全局数据库与互动查询可能。所以我想拥有: > 记录的全局存储区(GlobalKTable、KeyValueStore)
例如,我有两个系列: 第一个被命名为“Songs ”,该集合中的文档存储关于宋立科< code >标题 、< code >持续时间 、< code >发行日期和其他的所有元数据。除此之外,还有< code>artist和< code>producer字段,这些字段存储来自“Stars”集合的相关文档的id。 “歌曲”收藏中的文档示例: 你猜的第二个是“星星”集合,这个集合中的文档存储了关于星星的信
问题内容: 我一直在寻找通过Linux中的命令行查找具有相同名称的正在运行的进程数的最佳方法。例如,如果我想查找正在运行的bash进程数并获得“ 5”。目前,我有一个脚本执行’pidof’,然后对标记化字符串进行计数。这工作正常,但我想知道是否有更好的方法可以完全通过命令行来完成。在此先感谢您的帮助。 问题答案: 在可用的系统上,该选项返回与给定名称匹配的进程数的计数 请注意,这是-style匹配
是否可以在Spring Cloud中使用@EnableBinding注释的类流或在方法中使用@StreamListener使用交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和process方法中实例化ReadOnlyKeyValueStore,但它始终为空。 我的@StreamListener方法正在监听一组